diff --git a/.gitignore b/.gitignore index 1ce9c03db4..2ad291abd9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ **/venv/ **/__pycache__/** private.* +.venv \ No newline at end of file diff --git a/core/bifrost.go b/core/bifrost.go index c8bc5e0e3b..9eeaac54be 100644 --- a/core/bifrost.go +++ b/core/bifrost.go @@ -195,6 +195,82 @@ func (bifrost *Bifrost) prepareProvider(providerKey schemas.ModelProvider, confi return nil } +// UpdateProviderConcurrency dynamically updates the queue size and concurrency for an existing provider. +// This method gracefully stops existing workers, creates a new queue with updated settings, +// and starts new workers with the updated concurrency configuration. +// +// Parameters: +// - providerKey: The provider to update +// +// Returns: +// - error: Any error that occurred during the update process +// +// Note: This operation will temporarily pause request processing for the specified provider +// while the transition occurs. In-flight requests will complete before workers are stopped. +func (bifrost *Bifrost) UpdateProviderConcurrency(providerKey schemas.ModelProvider) error { + bifrost.logger.Info(fmt.Sprintf("Updating concurrency configuration for provider %s", providerKey)) + + // Get the updated configuration from the account + providerConfig, err := bifrost.account.GetConfigForProvider(providerKey) + if err != nil { + return fmt.Errorf("failed to get updated config for provider %s: %v", providerKey, err) + } + + // Check if provider currently exists + oldQueue, exists := bifrost.requestQueues[providerKey] + if !exists { + bifrost.logger.Debug(fmt.Sprintf("Provider %s not currently active, initializing with new configuration", providerKey)) + // If provider doesn't exist, just prepare it with new configuration + return bifrost.prepareProvider(providerKey, providerConfig) + } + + // Check if the provider has any keys (skip keyless providers) + if providerRequiresKey(providerKey) { + keys, err := bifrost.account.GetKeysForProvider(providerKey) + if err != nil || len(keys) == 0 { + return fmt.Errorf("failed to get keys for provider %s: %v", providerKey, err) + } + } + + bifrost.logger.Debug(fmt.Sprintf("Gracefully stopping existing workers for provider %s", providerKey)) + + // Step 1: Close the existing queue to signal workers to stop processing new requests + close(oldQueue) + + // Step 2: Wait for all existing workers to finish processing in-flight requests + if waitGroup, exists := bifrost.waitGroups[providerKey]; exists { + waitGroup.Wait() + bifrost.logger.Debug(fmt.Sprintf("All workers for provider %s have stopped", providerKey)) + } + + // Step 3: Create new queue with updated buffer size + newQueue := make(chan ChannelMessage, providerConfig.ConcurrencyAndBufferSize.BufferSize) + bifrost.requestQueues[providerKey] = newQueue + + // Step 4: Create new wait group for the updated workers + bifrost.waitGroups[providerKey] = &sync.WaitGroup{} + + // Step 5: Create provider instance + provider, err := bifrost.createProviderFromProviderKey(providerKey, providerConfig) + if err != nil { + return fmt.Errorf("failed to create provider instance for %s: %v", providerKey, err) + } + + // Step 6: Start new workers with updated concurrency + bifrost.logger.Debug(fmt.Sprintf("Starting %d new workers for provider %s with buffer size %d", + providerConfig.ConcurrencyAndBufferSize.Concurrency, + providerKey, + providerConfig.ConcurrencyAndBufferSize.BufferSize)) + + for range providerConfig.ConcurrencyAndBufferSize.Concurrency { + bifrost.waitGroups[providerKey].Add(1) + go bifrost.requestWorker(provider, newQueue) + } + + bifrost.logger.Info(fmt.Sprintf("Successfully updated concurrency configuration for provider %s", providerKey)) + return nil +} + // Init initializes a new Bifrost instance with the given configuration. // It sets up the account, plugins, object pools, and initializes providers. // Returns an error if initialization fails. diff --git a/transports/bifrost-http/handlers/completions.go b/transports/bifrost-http/handlers/completions.go new file mode 100644 index 0000000000..bd6fa7c514 --- /dev/null +++ b/transports/bifrost-http/handlers/completions.go @@ -0,0 +1,138 @@ +// Package handlers provides HTTP request handlers for the Bifrost HTTP transport. +// This file contains completion request handlers for text and chat completions. +package handlers + +import ( + "encoding/json" + "fmt" + + "github.com/fasthttp/router" + bifrost "github.com/maximhq/bifrost/core" + "github.com/maximhq/bifrost/core/schemas" + "github.com/maximhq/bifrost/transports/bifrost-http/lib" + "github.com/valyala/fasthttp" +) + +// CompletionHandler manages HTTP requests for completion operations +type CompletionHandler struct { + client *bifrost.Bifrost + logger schemas.Logger +} + +// NewCompletionHandler creates a new completion handler instance +func NewCompletionHandler(client *bifrost.Bifrost, logger schemas.Logger) *CompletionHandler { + return &CompletionHandler{ + client: client, + logger: logger, + } +} + +// CompletionRequest represents a request for either text or chat completion +type CompletionRequest struct { + Provider schemas.ModelProvider `json:"provider"` // The AI model provider to use + Messages []schemas.BifrostMessage `json:"messages"` // Chat messages (for chat completion) + Text string `json:"text"` // Text input (for text completion) + Model string `json:"model"` // Model to use + Params *schemas.ModelParameters `json:"params"` // Additional model parameters + Fallbacks []schemas.Fallback `json:"fallbacks"` // Fallback providers and models +} + +type CompletionType string + +const ( + CompletionTypeText CompletionType = "text" + CompletionTypeChat CompletionType = "chat" +) + +// RegisterRoutes registers all completion-related routes +func (h *CompletionHandler) RegisterRoutes(r *router.Router) { + // Completion endpoints + r.POST("/v1/text/completions", h.TextCompletion) + r.POST("/v1/chat/completions", h.ChatCompletion) +} + +// TextCompletion handles POST /v1/text/completions - Process text completion requests +func (h *CompletionHandler) TextCompletion(ctx *fasthttp.RequestCtx) { + h.handleCompletion(ctx, CompletionTypeText) +} + +// ChatCompletion handles POST /v1/chat/completions - Process chat completion requests +func (h *CompletionHandler) ChatCompletion(ctx *fasthttp.RequestCtx) { + h.handleCompletion(ctx, CompletionTypeChat) +} + +// handleCompletion processes both text and chat completion requests +// It handles request parsing, validation, and response formatting +func (h *CompletionHandler) handleCompletion(ctx *fasthttp.RequestCtx, completionType CompletionType) { + var req CompletionRequest + if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid request format: %v", err), h.logger) + return + } + + // Validate required fields + if req.Provider == "" { + SendError(ctx, fasthttp.StatusBadRequest, "Provider is required", h.logger) + return + } + + if req.Model == "" { + SendError(ctx, fasthttp.StatusBadRequest, "Model is required", h.logger) + return + } + + // Create BifrostRequest + bifrostReq := &schemas.BifrostRequest{ + Provider: req.Provider, + Model: req.Model, + Params: req.Params, + Fallbacks: req.Fallbacks, + } + + // Validate and set input based on completion type + switch completionType { + case CompletionTypeText: + if req.Text == "" { + SendError(ctx, fasthttp.StatusBadRequest, "Text is required for text completion", h.logger) + return + } + bifrostReq.Input = schemas.RequestInput{ + TextCompletionInput: &req.Text, + } + case CompletionTypeChat: + if len(req.Messages) == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "Messages array is required for chat completion", h.logger) + return + } + bifrostReq.Input = schemas.RequestInput{ + ChatCompletionInput: &req.Messages, + } + } + + // Convert context + bifrostCtx := lib.ConvertToBifrostContext(ctx) + if bifrostCtx == nil { + SendError(ctx, fasthttp.StatusInternalServerError, "Failed to convert context", h.logger) + return + } + + // Execute request + var resp *schemas.BifrostResponse + var bifrostErr *schemas.BifrostError + + switch completionType { + case CompletionTypeText: + resp, bifrostErr = h.client.TextCompletionRequest(*bifrostCtx, bifrostReq) + case CompletionTypeChat: + resp, bifrostErr = h.client.ChatCompletionRequest(*bifrostCtx, bifrostReq) + } + + // Handle response + if bifrostErr != nil { + SendBifrostError(ctx, bifrostErr, h.logger) + return + } + + // Send successful response + SendJSON(ctx, resp, h.logger) +} diff --git a/transports/bifrost-http/handlers/integrations.go b/transports/bifrost-http/handlers/integrations.go new file mode 100644 index 0000000000..fac262f0e3 --- /dev/null +++ b/transports/bifrost-http/handlers/integrations.go @@ -0,0 +1,41 @@ +// Package handlers provides HTTP request handlers for the Bifrost HTTP transport. +// This file contains integration management handlers for AI provider integrations. +package handlers + +import ( + "github.com/fasthttp/router" + bifrost "github.com/maximhq/bifrost/core" + "github.com/maximhq/bifrost/transports/bifrost-http/integrations" + "github.com/maximhq/bifrost/transports/bifrost-http/integrations/anthropic" + "github.com/maximhq/bifrost/transports/bifrost-http/integrations/genai" + "github.com/maximhq/bifrost/transports/bifrost-http/integrations/litellm" + "github.com/maximhq/bifrost/transports/bifrost-http/integrations/openai" +) + +// IntegrationHandler manages HTTP requests for AI provider integrations +type IntegrationHandler struct { + extensions []integrations.ExtensionRouter +} + +// NewIntegrationHandler creates a new integration handler instance +func NewIntegrationHandler(client *bifrost.Bifrost) *IntegrationHandler { + // Initialize all available integration routers + extensions := []integrations.ExtensionRouter{ + genai.NewGenAIRouter(client), + openai.NewOpenAIRouter(client), + anthropic.NewAnthropicRouter(client), + litellm.NewLiteLLMRouter(client), + } + + return &IntegrationHandler{ + extensions: extensions, + } +} + +// RegisterRoutes registers all integration routes for AI provider compatibility endpoints +func (h *IntegrationHandler) RegisterRoutes(r *router.Router) { + // Register routes for each integration extension + for _, extension := range h.extensions { + extension.RegisterRoutes(r) + } +} diff --git a/transports/bifrost-http/handlers/mcp.go b/transports/bifrost-http/handlers/mcp.go new file mode 100644 index 0000000000..c781b6eac9 --- /dev/null +++ b/transports/bifrost-http/handlers/mcp.go @@ -0,0 +1,71 @@ +// Package handlers provides HTTP request handlers for the Bifrost HTTP transport. +// This file contains MCP (Model Context Protocol) tool execution handlers. +package handlers + +import ( + "encoding/json" + "fmt" + + "github.com/fasthttp/router" + bifrost "github.com/maximhq/bifrost/core" + "github.com/maximhq/bifrost/core/schemas" + "github.com/maximhq/bifrost/transports/bifrost-http/lib" + "github.com/valyala/fasthttp" +) + +// MCPHandler manages HTTP requests for MCP tool operations +type MCPHandler struct { + client *bifrost.Bifrost + logger schemas.Logger +} + +// NewMCPHandler creates a new MCP handler instance +func NewMCPHandler(client *bifrost.Bifrost, logger schemas.Logger) *MCPHandler { + return &MCPHandler{ + client: client, + logger: logger, + } +} + +// RegisterRoutes registers all MCP-related routes +func (h *MCPHandler) RegisterRoutes(r *router.Router) { + // MCP tool execution endpoint + r.POST("/v1/mcp/tool/execute", h.ExecuteTool) +} + +// ExecuteTool handles POST /v1/mcp/tool/execute - Execute MCP tool +func (h *MCPHandler) ExecuteTool(ctx *fasthttp.RequestCtx) { + var req schemas.ToolCall + if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid request format: %v", err), h.logger) + return + } + + // Validate required fields + if req.Function.Name == nil || *req.Function.Name == "" { + SendError(ctx, fasthttp.StatusBadRequest, "Tool function name is required", h.logger) + return + } + + // Convert context + bifrostCtx := lib.ConvertToBifrostContext(ctx) + if bifrostCtx == nil { + SendError(ctx, fasthttp.StatusInternalServerError, "Failed to convert context", h.logger) + return + } + + // Execute MCP tool + resp, bifrostErr := h.client.ExecuteMCPTool(*bifrostCtx, req) + if bifrostErr != nil { + SendBifrostError(ctx, bifrostErr, h.logger) + return + } + + // Send successful response + ctx.SetStatusCode(fasthttp.StatusOK) + ctx.SetContentType("application/json") + if encodeErr := json.NewEncoder(ctx).Encode(resp); encodeErr != nil { + h.logger.Warn(fmt.Sprintf("Failed to encode response: %v", encodeErr)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to encode response: %v", encodeErr), h.logger) + } +} diff --git a/transports/bifrost-http/handlers/providers.go b/transports/bifrost-http/handlers/providers.go new file mode 100644 index 0000000000..9d34e0cc43 --- /dev/null +++ b/transports/bifrost-http/handlers/providers.go @@ -0,0 +1,410 @@ +// Package handlers provides HTTP request handlers for the Bifrost HTTP transport. +// This file contains all provider management functionality including CRUD operations. +package handlers + +import ( + "encoding/json" + "fmt" + + "github.com/fasthttp/router" + bifrost "github.com/maximhq/bifrost/core" + "github.com/maximhq/bifrost/core/schemas" + "github.com/maximhq/bifrost/core/schemas/meta" + "github.com/maximhq/bifrost/transports/bifrost-http/lib" + "github.com/valyala/fasthttp" +) + +// ProviderHandler manages HTTP requests for provider operations +type ProviderHandler struct { + store *lib.ConfigStore + client *bifrost.Bifrost + logger schemas.Logger +} + +// NewProviderHandler creates a new provider handler instance +func NewProviderHandler(store *lib.ConfigStore, client *bifrost.Bifrost, logger schemas.Logger) *ProviderHandler { + return &ProviderHandler{ + store: store, + client: client, + logger: logger, + } +} + +// AddProviderRequest represents the request body for adding a new provider +type AddProviderRequest struct { + Provider schemas.ModelProvider `json:"provider"` + Keys []schemas.Key `json:"keys"` // API keys for the provider + NetworkConfig *schemas.NetworkConfig `json:"network_config,omitempty"` // Network-related settings + MetaConfig *map[string]interface{} `json:"meta_config,omitempty"` // Provider-specific metadata + ConcurrencyAndBufferSize *schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size,omitempty"` // Concurrency settings +} + +// UpdateProviderRequest represents the request body for updating a provider +type UpdateProviderRequest struct { + Keys *[]schemas.Key `json:"keys,omitempty"` // API keys for the provider + NetworkConfig *schemas.NetworkConfig `json:"network_config,omitempty"` // Network-related settings + MetaConfig *map[string]interface{} `json:"meta_config,omitempty"` // Provider-specific metadata + ConcurrencyAndBufferSize *schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size,omitempty"` // Concurrency settings +} + +// ProviderResponse represents the response for provider operations +type ProviderResponse struct { + Provider schemas.ModelProvider `json:"provider"` + Config *lib.ProviderConfig `json:"config,omitempty"` + Status string `json:"status"` + Message string `json:"message,omitempty"` +} + +// ListProvidersResponse represents the response for listing all providers +type ListProvidersResponse struct { + Providers []ProviderResponse `json:"providers"` + Total int `json:"total"` +} + +// ErrorResponse represents an error response +type ErrorResponse struct { + Error string `json:"error"` + Message string `json:"message,omitempty"` +} + +// RegisterRoutes registers all provider management routes +func (h *ProviderHandler) RegisterRoutes(r *router.Router) { + // Provider CRUD operations + r.GET("/providers", h.ListProviders) + r.GET("/providers/{provider}", h.GetProvider) + r.POST("/providers", h.AddProvider) + r.PUT("/providers/{provider}", h.UpdateProvider) + r.DELETE("/providers/{provider}", h.DeleteProvider) + + // Configuration persistence + r.POST("/config/save", h.SaveConfig) +} + +// ListProviders handles GET /providers - List all providers +func (h *ProviderHandler) ListProviders(ctx *fasthttp.RequestCtx) { + providers, err := h.store.GetAllProviders() + if err != nil { + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to get providers: %v", err), h.logger) + return + } + + var providerResponses []ProviderResponse + for _, provider := range providers { + config, err := h.store.GetProviderConfig(provider) + if err != nil { + h.logger.Warn(fmt.Sprintf("Failed to get config for provider %s: %v", provider, err)) + // Include provider even if config fetch fails + providerResponses = append(providerResponses, ProviderResponse{ + Provider: provider, + Status: "error", + Message: fmt.Sprintf("Failed to get config: %v", err), + }) + continue + } + + providerResponses = append(providerResponses, ProviderResponse{ + Provider: provider, + Config: config, + Status: "active", + }) + } + + response := ListProvidersResponse{ + Providers: providerResponses, + Total: len(providerResponses), + } + + SendJSON(ctx, response, h.logger) +} + +// GetProvider handles GET /providers/{provider} - Get specific provider +func (h *ProviderHandler) GetProvider(ctx *fasthttp.RequestCtx) { + provider, err := getProviderFromCtx(ctx) + if err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid provider: %v", err), h.logger) + return + } + + config, err := h.store.GetProviderConfig(provider) + if err != nil { + SendError(ctx, fasthttp.StatusNotFound, fmt.Sprintf("Provider not found: %v", err), h.logger) + return + } + + response := ProviderResponse{ + Provider: provider, + Config: config, + Status: "active", + } + + SendJSON(ctx, response, h.logger) +} + +// AddProvider handles POST /providers - Add a new provider +func (h *ProviderHandler) AddProvider(ctx *fasthttp.RequestCtx) { + var req AddProviderRequest + if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid JSON: %v", err), h.logger) + return + } + + // Validate provider + if req.Provider == "" { + SendError(ctx, fasthttp.StatusBadRequest, "Missing provider", h.logger) + return + } + + // Validate required keys + if len(req.Keys) == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "At least one API key is required", h.logger) + return + } + + if req.ConcurrencyAndBufferSize != nil { + if req.ConcurrencyAndBufferSize.Concurrency == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "Concurrency must be greater than 0", h.logger) + return + } + if req.ConcurrencyAndBufferSize.BufferSize == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "Buffer size must be greater than 0", h.logger) + return + } + } + + // Check if provider already exists + if _, err := h.store.GetProviderConfig(req.Provider); err == nil { + SendError(ctx, fasthttp.StatusConflict, fmt.Sprintf("Provider %s already exists", req.Provider), h.logger) + return + } + + // Construct ProviderConfig from individual fields + config := lib.ProviderConfig{ + Keys: req.Keys, + NetworkConfig: req.NetworkConfig, + ConcurrencyAndBufferSize: req.ConcurrencyAndBufferSize, + } + + // Handle meta config if provided + if req.MetaConfig != nil && len(*req.MetaConfig) > 0 { + // Convert to appropriate meta config type based on provider + metaConfig, err := h.convertToProviderMetaConfig(req.Provider, *req.MetaConfig) + if err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid meta config: %v", err), h.logger) + return + } + config.MetaConfig = metaConfig + } + + // Add provider to store (env vars will be processed by store) + if err := h.store.AddProvider(req.Provider, config); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to add provider %s: %v", req.Provider, err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to add provider: %v", err), h.logger) + return + } + + h.logger.Info(fmt.Sprintf("Provider %s added successfully", req.Provider)) + + response := ProviderResponse{ + Provider: req.Provider, + Config: &config, + Status: "added", + Message: fmt.Sprintf("Provider %s added successfully", req.Provider), + } + + SendJSON(ctx, response, h.logger) +} + +// UpdateProvider handles PUT /providers/{provider} - Update provider config +// NOTE: This endpoint expects ALL fields to be provided in the request body, +// including both edited and non-edited fields. Partial updates are not supported. +// The frontend should send the complete provider configuration. +func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) { + provider, err := getProviderFromCtx(ctx) + if err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid provider: %v", err), h.logger) + return + } + + var req UpdateProviderRequest + if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid JSON: %v", err), h.logger) + return + } + + // Check if provider exists + oldConfig, err := h.store.GetProviderConfig(provider) + if err != nil { + SendError(ctx, fasthttp.StatusNotFound, fmt.Sprintf("Provider not found: %v", err), h.logger) + return + } + + // Construct ProviderConfig from individual fields + config := lib.ProviderConfig{ + Keys: oldConfig.Keys, + NetworkConfig: oldConfig.NetworkConfig, + ConcurrencyAndBufferSize: oldConfig.ConcurrencyAndBufferSize, + } + + // Validate required keys (at least one key must be provided) + if req.Keys != nil { + if len(*req.Keys) == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "At least one API key is required", h.logger) + return + } + config.Keys = *req.Keys + } + + // Handle meta config if provided + if req.MetaConfig != nil && len(*req.MetaConfig) > 0 { + // Convert to appropriate meta config type based on provider + metaConfig, err := h.convertToProviderMetaConfig(provider, *req.MetaConfig) + if err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid meta config: %v", err), h.logger) + return + } + config.MetaConfig = metaConfig + } + + if req.ConcurrencyAndBufferSize != nil { + if req.ConcurrencyAndBufferSize.Concurrency == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "Concurrency must be greater than 0", h.logger) + return + } + if req.ConcurrencyAndBufferSize.BufferSize == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "Buffer size must be greater than 0", h.logger) + return + } + config.ConcurrencyAndBufferSize = req.ConcurrencyAndBufferSize + } + + // Update provider config in store (env vars will be processed by store) + if err := h.store.UpdateProviderConfig(provider, config); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to update provider %s: %v", provider, err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to update provider: %v", err), h.logger) + return + } + + // Update concurrency and queue configuration in Bifrost + if err := h.client.UpdateProviderConcurrency(provider); err != nil { + // Note: Store update succeeded, continue but log the concurrency update failure + h.logger.Warn(fmt.Sprintf("Failed to update concurrency for provider %s: %v", provider, err)) + } + + h.logger.Info(fmt.Sprintf("Provider %s updated successfully", provider)) + + response := ProviderResponse{ + Provider: provider, + Config: &config, + Status: "updated", + Message: fmt.Sprintf("Provider %s updated successfully", provider), + } + + SendJSON(ctx, response, h.logger) +} + +// DeleteProvider handles DELETE /providers/{provider} - Remove provider +func (h *ProviderHandler) DeleteProvider(ctx *fasthttp.RequestCtx) { + provider, err := getProviderFromCtx(ctx) + if err != nil { + SendError(ctx, fasthttp.StatusBadRequest, fmt.Sprintf("Invalid provider: %v", err), h.logger) + return + } + + // Check if provider exists + if _, err := h.store.GetProviderConfig(provider); err != nil { + SendError(ctx, fasthttp.StatusNotFound, fmt.Sprintf("Provider not found: %v", err), h.logger) + return + } + + // Remove provider from store + if err := h.store.RemoveProvider(provider); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to remove provider %s: %v", provider, err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to remove provider: %v", err), h.logger) + return + } + + h.logger.Info(fmt.Sprintf("Provider %s removed successfully", provider)) + + response := ProviderResponse{ + Provider: provider, + Status: "removed", + Message: fmt.Sprintf("Provider %s removed successfully", provider), + } + + SendJSON(ctx, response, h.logger) +} + +// SaveConfig handles POST /config/save - Persist current configuration to JSON file +func (h *ProviderHandler) SaveConfig(ctx *fasthttp.RequestCtx) { + // Save current configuration back to the original JSON file + if err := h.store.SaveConfig(); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to save configuration: %v", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to save configuration: %v", err), h.logger) + return + } + + h.logger.Info("Configuration saved successfully") + + response := map[string]interface{}{ + "status": "success", + "message": "Configuration saved successfully", + } + + SendJSON(ctx, response, h.logger) +} + +// convertToProviderMetaConfig converts a generic map to the appropriate provider-specific meta config +func (h *ProviderHandler) convertToProviderMetaConfig(provider schemas.ModelProvider, metaConfigMap map[string]interface{}) (*schemas.MetaConfig, error) { + if len(metaConfigMap) == 0 { + return nil, nil + } + + // Convert map to JSON and then to specific meta config type + metaConfigJSON, err := json.Marshal(metaConfigMap) + if err != nil { + return nil, fmt.Errorf("failed to marshal meta config: %w", err) + } + + switch provider { + case schemas.Azure: + var azureMetaConfig meta.AzureMetaConfig + if err := json.Unmarshal(metaConfigJSON, &azureMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Azure meta config: %w", err) + } + var metaConfig schemas.MetaConfig = &azureMetaConfig + return &metaConfig, nil + + case schemas.Bedrock: + var bedrockMetaConfig meta.BedrockMetaConfig + if err := json.Unmarshal(metaConfigJSON, &bedrockMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Bedrock meta config: %w", err) + } + var metaConfig schemas.MetaConfig = &bedrockMetaConfig + return &metaConfig, nil + + case schemas.Vertex: + var vertexMetaConfig meta.VertexMetaConfig + if err := json.Unmarshal(metaConfigJSON, &vertexMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Vertex meta config: %w", err) + } + var metaConfig schemas.MetaConfig = &vertexMetaConfig + return &metaConfig, nil + + default: + // For providers that don't support meta config, return nil + return nil, nil + } +} + +func getProviderFromCtx(ctx *fasthttp.RequestCtx) (schemas.ModelProvider, error) { + providerValue := ctx.UserValue("provider") + if providerValue == nil { + return "", fmt.Errorf("missing provider parameter") + } + providerStr, ok := providerValue.(string) + if !ok { + return "", fmt.Errorf("invalid provider parameter type") + } + + return schemas.ModelProvider(providerStr), nil +} diff --git a/transports/bifrost-http/handlers/utils.go b/transports/bifrost-http/handlers/utils.go new file mode 100644 index 0000000000..ffded4a5dc --- /dev/null +++ b/transports/bifrost-http/handlers/utils.go @@ -0,0 +1,50 @@ +// Package handlers provides HTTP request handlers for the Bifrost HTTP transport. +// This file contains common utility functions used across all handlers. +package handlers + +import ( + "encoding/json" + "fmt" + + "github.com/maximhq/bifrost/core/schemas" + "github.com/valyala/fasthttp" +) + +// SendJSON sends a JSON response with 200 OK status +func SendJSON(ctx *fasthttp.RequestCtx, data interface{}, logger schemas.Logger) { + ctx.SetStatusCode(fasthttp.StatusOK) + ctx.SetContentType("application/json") + + if err := json.NewEncoder(ctx).Encode(data); err != nil { + logger.Warn(fmt.Sprintf("Failed to encode JSON response: %v", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to encode response: %v", err), logger) + } +} + +// SendError sends a BifrostError response +func SendError(ctx *fasthttp.RequestCtx, statusCode int, message string, logger schemas.Logger) { + bifrostErr := &schemas.BifrostError{ + IsBifrostError: false, + StatusCode: &statusCode, + Error: schemas.ErrorField{ + Message: message, + }, + } + SendBifrostError(ctx, bifrostErr, logger) +} + +// SendBifrostError sends a BifrostError response +func SendBifrostError(ctx *fasthttp.RequestCtx, bifrostErr *schemas.BifrostError, logger schemas.Logger) { + if bifrostErr.StatusCode != nil { + ctx.SetStatusCode(*bifrostErr.StatusCode) + } else { + ctx.SetStatusCode(fasthttp.StatusInternalServerError) + } + + ctx.SetContentType("application/json") + if encodeErr := json.NewEncoder(ctx).Encode(bifrostErr); encodeErr != nil { + logger.Warn(fmt.Sprintf("Failed to encode error response: %v", encodeErr)) + ctx.SetStatusCode(fasthttp.StatusInternalServerError) + ctx.SetBodyString(fmt.Sprintf("Failed to encode error response: %v", encodeErr)) + } +} diff --git a/transports/bifrost-http/lib/account.go b/transports/bifrost-http/lib/account.go index 0e96c6abd0..757b7bda81 100644 --- a/transports/bifrost-http/lib/account.go +++ b/transports/bifrost-http/lib/account.go @@ -3,54 +3,62 @@ package lib import ( - "errors" "fmt" - "os" - "reflect" - "strings" - "sync" "github.com/maximhq/bifrost/core/schemas" ) // BaseAccount implements the Account interface for Bifrost. -// It manages provider configurations and API keys. +// It manages provider configurations using a bbolt store for persistent storage. +// All data processing (environment variables, meta configs) is done upfront in the store. type BaseAccount struct { - Config ConfigMap // Map of provider configurations - mu sync.Mutex // Mutex to protect Config access + store *ConfigStore // bbolt store for persistent configuration +} + +// NewBaseAccount creates a new BaseAccount with the given store +func NewBaseAccount(store *ConfigStore) *BaseAccount { + return &BaseAccount{ + store: store, + } } // GetConfiguredProviders returns a list of all configured providers. // Implements the Account interface. func (baseAccount *BaseAccount) GetConfiguredProviders() ([]schemas.ModelProvider, error) { - baseAccount.mu.Lock() - defer baseAccount.mu.Unlock() - - providers := make([]schemas.ModelProvider, 0, len(baseAccount.Config)) - for provider := range baseAccount.Config { - providers = append(providers, provider) + if baseAccount.store == nil { + return nil, fmt.Errorf("store not initialized") } - return providers, nil + + return baseAccount.store.GetAllProviders() } // GetKeysForProvider returns the API keys configured for a specific provider. +// Keys are already processed (environment variables resolved) by the store. // Implements the Account interface. func (baseAccount *BaseAccount) GetKeysForProvider(providerKey schemas.ModelProvider) ([]schemas.Key, error) { - baseAccount.mu.Lock() - defer baseAccount.mu.Unlock() + if baseAccount.store == nil { + return nil, fmt.Errorf("store not initialized") + } + + config, err := baseAccount.store.GetProviderConfig(providerKey) + if err != nil { + return nil, err + } - return baseAccount.Config[providerKey].Keys, nil + return config.Keys, nil } // GetConfigForProvider returns the complete configuration for a specific provider. +// Configuration is already fully processed (environment variables, meta configs) by the store. // Implements the Account interface. func (baseAccount *BaseAccount) GetConfigForProvider(providerKey schemas.ModelProvider) (*schemas.ProviderConfig, error) { - baseAccount.mu.Lock() - defer baseAccount.mu.Unlock() + if baseAccount.store == nil { + return nil, fmt.Errorf("store not initialized") + } - config, exists := baseAccount.Config[providerKey] - if !exists { - return nil, errors.New("config for provider not found") + config, err := baseAccount.store.GetProviderConfig(providerKey) + if err != nil { + return nil, err } providerConfig := &schemas.ProviderConfig{} @@ -73,103 +81,3 @@ func (baseAccount *BaseAccount) GetConfigForProvider(providerKey schemas.ModelPr return providerConfig, nil } - -// ReadKeys reads environment variables from the environment and updates the provider configurations. -// It replaces values starting with "env." in the config with actual values from the environment. -// Returns an error if any required environment variable is missing. -func (baseAccount *BaseAccount) ReadKeys() error { - // Helper function to check and replace env values - replaceEnvValue := func(value string) (string, error) { - if strings.HasPrefix(value, "env.") { - envKey := strings.TrimPrefix(value, "env.") - if envValue := os.Getenv(envKey); envValue != "" { - return envValue, nil - } - return "", fmt.Errorf("environment variable %s not found in the environment", envKey) - } - return value, nil - } - - // Helper function to recursively check and replace env values in a struct - var processStruct func(interface{}) error - processStruct = func(v interface{}) error { - val := reflect.ValueOf(v) - - // Dereference pointer if present - if val.Kind() == reflect.Ptr { - val = val.Elem() - } - - // Handle interface types - if val.Kind() == reflect.Interface { - val = val.Elem() - // If the interface value is a pointer, dereference it - if val.Kind() == reflect.Ptr { - val = val.Elem() - } - } - - if val.Kind() != reflect.Struct { - return nil - } - - typ := val.Type() - for i := 0; i < val.NumField(); i++ { - field := val.Field(i) - fieldType := typ.Field(i) - - // Skip unexported fields - if !field.CanSet() { - continue - } - - switch field.Kind() { - case reflect.String: - if field.CanSet() { - value := field.String() - if strings.HasPrefix(value, "env.") { - newValue, err := replaceEnvValue(value) - if err != nil { - return fmt.Errorf("field %s: %w", fieldType.Name, err) - } - field.SetString(newValue) - } - } - case reflect.Interface: - if !field.IsNil() { - if err := processStruct(field.Interface()); err != nil { - return err - } - } - } - } - return nil - } - - // Lock the config map for the entire update operation - baseAccount.mu.Lock() - defer baseAccount.mu.Unlock() - - // Check and replace values in provider configs - for provider, config := range baseAccount.Config { - // Check keys - for i, key := range config.Keys { - newValue, err := replaceEnvValue(key.Value) - if err != nil { - return fmt.Errorf("provider %s: %w", provider, err) - } - config.Keys[i].Value = newValue - } - - // Check meta config if it exists - if config.MetaConfig != nil { - if err := processStruct(config.MetaConfig); err != nil { - return fmt.Errorf("provider %s: %w", provider, err) - } - } - - baseAccount.Config[provider] = config - } - - return nil -} diff --git a/transports/bifrost-http/lib/store.go b/transports/bifrost-http/lib/store.go new file mode 100644 index 0000000000..287d258937 --- /dev/null +++ b/transports/bifrost-http/lib/store.go @@ -0,0 +1,565 @@ +// Package lib provides core functionality for the Bifrost HTTP service, +// including context propagation, header management, and integration with monitoring systems. +package lib + +import ( + "encoding/json" + "fmt" + "os" + "strings" + "sync" + + "github.com/maximhq/bifrost/core/schemas" + "github.com/maximhq/bifrost/core/schemas/meta" +) + +// ConfigStore represents a high-performance in-memory configuration store for Bifrost. +// It provides thread-safe access to provider configurations with the ability to +// persist changes back to the original JSON configuration file. +// +// Features: +// - Pure in-memory storage for ultra-fast access +// - Environment variable processing for API keys and meta configurations +// - Thread-safe operations with read-write mutexes +// - Real-time configuration updates via HTTP API +// - Explicit persistence control via WriteConfigToFile() +// - Support for all provider-specific meta configurations (Azure, Bedrock, Vertex) +type ConfigStore struct { + mu sync.RWMutex + logger schemas.Logger + configPath string // Path to the original JSON config file + + // In-memory storage + providers map[schemas.ModelProvider]ProviderConfig + mcpConfig *schemas.MCPConfig +} + +// NewConfigStore creates a new in-memory configuration store instance. +func NewConfigStore(logger schemas.Logger) (*ConfigStore, error) { + return &ConfigStore{ + logger: logger, + providers: make(map[schemas.ModelProvider]ProviderConfig), + }, nil +} + +// LoadFromConfig loads initial configuration from a JSON config file into memory +// with full preprocessing including environment variable resolution and meta config parsing. +// All processing is done upfront to ensure zero latency when retrieving data. +// +// This method handles: +// - JSON config file parsing +// - Environment variable substitution for API keys (env.VARIABLE_NAME) +// - Provider-specific meta config processing (Azure, Bedrock, Vertex) +// - Case conversion for provider names (e.g., "OpenAI" -> "openai") +// - In-memory storage for ultra-fast access during request processing +func (s *ConfigStore) LoadFromConfig(configPath string) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.configPath = configPath + s.logger.Info(fmt.Sprintf("Loading configuration from: %s", configPath)) + + // Read and parse the JSON config file + data, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("failed to read config file: %w", err) + } + + // Parse the JSON directly + var configData struct { + Providers map[string]json.RawMessage `json:"providers"` + MCP json.RawMessage `json:"mcp,omitempty"` + } + + if err := json.Unmarshal(data, &configData); err != nil { + return fmt.Errorf("failed to unmarshal config: %w", err) + } + + // Process provider configurations + processedProviders := make(map[schemas.ModelProvider]ProviderConfig) + + // First unmarshal providers into a map with string keys to handle case conversion + var rawProviders map[string]ProviderConfig + if providersBytes, err := json.Marshal(configData.Providers); err != nil { + return fmt.Errorf("failed to marshal providers: %w", err) + } else if err := json.Unmarshal(providersBytes, &rawProviders); err != nil { + return fmt.Errorf("failed to unmarshal providers: %w", err) + } + + // Create a temporary structure to unmarshal the full JSON with proper meta configs + var tempConfig struct { + Providers map[string]struct { + MetaConfig json.RawMessage `json:"meta_config"` + } `json:"providers"` + } + + if err := json.Unmarshal(data, &tempConfig); err != nil { + return fmt.Errorf("failed to unmarshal configuration file: %w", err) + } + + // Process each provider configuration + for rawProviderName, cfg := range rawProviders { + provider := schemas.ModelProvider(strings.ToLower(rawProviderName)) + + // Process meta config if it exists + if tempProvider, exists := tempConfig.Providers[rawProviderName]; exists && len(tempProvider.MetaConfig) > 0 { + processedMetaConfig, err := s.processMetaConfigEnvVars(tempProvider.MetaConfig, provider) + if err != nil { + s.logger.Warn(fmt.Sprintf("failed to process env vars in meta config for %s: %v", provider, err)) + continue + } + + // Parse and set the meta config + metaConfig, err := s.parseMetaConfig(processedMetaConfig, provider) + if err != nil { + s.logger.Warn(fmt.Sprintf("failed to parse meta config for %s: %v", provider, err)) + continue + } else { + cfg.MetaConfig = metaConfig + } + } + + // Process environment variables in keys + for i, key := range cfg.Keys { + processedValue, err := s.replaceEnvValue(key.Value) + if err != nil { + s.logger.Warn(fmt.Sprintf("failed to process env vars in keys for %s: %v", provider, err)) + continue + } + cfg.Keys[i].Value = processedValue + } + + processedProviders[provider] = cfg + } + + // Store processed configurations in memory + s.providers = processedProviders + + // Parse MCP config if present + if len(configData.MCP) > 0 { + var mcpConfig schemas.MCPConfig + if err := json.Unmarshal(configData.MCP, &mcpConfig); err != nil { + s.logger.Warn(fmt.Sprintf("failed to parse MCP config: %v", err)) + } else { + // Process environment variables in MCP config + s.mcpConfig = &mcpConfig + s.processMCPEnvVars() + } + } + + s.logger.Info("Successfully loaded configuration.") + return nil +} + +// WriteConfigToFile writes the current in-memory configuration back to a JSON file +// in the exact same format that LoadFromConfig expects. This enables persistence +// of runtime configuration changes. +func (s *ConfigStore) WriteConfigToFile(configPath string) error { + s.mu.RLock() + defer s.mu.RUnlock() + + s.logger.Info(fmt.Sprintf("Writing current configuration to: %s", configPath)) + + // Prepare the output structure + output := struct { + Providers map[string]interface{} `json:"providers"` + MCP *schemas.MCPConfig `json:"mcp,omitempty"` + }{ + Providers: make(map[string]interface{}), + MCP: s.mcpConfig, + } + + // Convert providers back to the original format + for provider, config := range s.providers { + providerName := string(provider) + + // Create provider config without processed values (keep env.* references) + providerConfig := map[string]interface{}{ + "keys": config.Keys, // Note: This will contain actual values, not env refs + } + + if config.NetworkConfig != nil { + providerConfig["network_config"] = config.NetworkConfig + } + + if config.ConcurrencyAndBufferSize != nil { + providerConfig["concurrency_and_buffer_size"] = config.ConcurrencyAndBufferSize + } + + if config.MetaConfig != nil { + providerConfig["meta_config"] = *config.MetaConfig + } + + output.Providers[providerName] = providerConfig + } + + // Marshal to JSON with proper formatting + data, err := json.MarshalIndent(output, "", " ") + if err != nil { + return fmt.Errorf("failed to marshal config: %w", err) + } + + // Write to file + if err := os.WriteFile(configPath, data, 0644); err != nil { + return fmt.Errorf("failed to write config file: %w", err) + } + + s.logger.Info(fmt.Sprintf("Successfully wrote configuration to: %s", configPath)) + return nil +} + +// SaveConfig writes the current configuration back to the original config file path +func (s *ConfigStore) SaveConfig() error { + if s.configPath == "" { + return fmt.Errorf("no config path set - use LoadFromConfig first") + } + return s.WriteConfigToFile(s.configPath) +} + +// parseMetaConfig converts raw JSON to the appropriate provider-specific meta config interface +func (s *ConfigStore) parseMetaConfig(rawMetaConfig json.RawMessage, provider schemas.ModelProvider) (*schemas.MetaConfig, error) { + switch provider { + case schemas.Azure: + var azureMetaConfig meta.AzureMetaConfig + if err := json.Unmarshal(rawMetaConfig, &azureMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Azure meta config: %w", err) + } + var metaConfig schemas.MetaConfig = &azureMetaConfig + return &metaConfig, nil + + case schemas.Bedrock: + var bedrockMetaConfig meta.BedrockMetaConfig + if err := json.Unmarshal(rawMetaConfig, &bedrockMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Bedrock meta config: %w", err) + } + var metaConfig schemas.MetaConfig = &bedrockMetaConfig + return &metaConfig, nil + + case schemas.Vertex: + var vertexMetaConfig meta.VertexMetaConfig + if err := json.Unmarshal(rawMetaConfig, &vertexMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Vertex meta config: %w", err) + } + var metaConfig schemas.MetaConfig = &vertexMetaConfig + return &metaConfig, nil + } + + return nil, fmt.Errorf("unsupported provider for meta config: %s", provider) +} + +// replaceEnvValue checks and replaces environment variable references in configuration values. +// Supports the "env.VARIABLE_NAME" syntax for referencing environment variables. +// This enables secure configuration management without hardcoding sensitive values. +// +// Examples: +// - "env.OPENAI_API_KEY" -> actual value from OPENAI_API_KEY environment variable +// - "sk-1234567890" -> returned as-is (no env prefix) +// +// Returns an error if the referenced environment variable is not found. +func (s *ConfigStore) replaceEnvValue(value string) (string, error) { + if strings.HasPrefix(value, "env.") { + envKey := strings.TrimPrefix(value, "env.") + if envValue := os.Getenv(envKey); envValue != "" { + return envValue, nil + } + return "", fmt.Errorf("environment variable %s not found", envKey) + } + return value, nil +} + +// processMetaConfigEnvVars processes environment variables in provider-specific meta configurations. +// This method handles the provider-specific meta config structures and processes environment +// variables in their fields, ensuring type safety and proper field handling. +// +// Supported providers and their processed fields: +// - Azure: Endpoint, APIVersion +// - Bedrock: SecretAccessKey, Region, SessionToken, ARN +// - Vertex: ProjectID, Region, AuthCredentials +// +// For unsupported providers, the meta config is returned unchanged. +// This approach ensures type safety while supporting environment variable substitution. +func (s *ConfigStore) processMetaConfigEnvVars(rawMetaConfig json.RawMessage, provider schemas.ModelProvider) (json.RawMessage, error) { + switch provider { + case schemas.Azure: + var azureMetaConfig meta.AzureMetaConfig + if err := json.Unmarshal(rawMetaConfig, &azureMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Azure meta config: %w", err) + } + + endpoint, err := s.replaceEnvValue(azureMetaConfig.Endpoint) + if err != nil { + return nil, err + } + azureMetaConfig.Endpoint = endpoint + if azureMetaConfig.APIVersion != nil { + apiVersion, err := s.replaceEnvValue(*azureMetaConfig.APIVersion) + if err != nil { + return nil, err + } + azureMetaConfig.APIVersion = &apiVersion + } + + processedJSON, err := json.Marshal(azureMetaConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal processed Azure meta config: %w", err) + } + return processedJSON, nil + + case schemas.Bedrock: + var bedrockMetaConfig meta.BedrockMetaConfig + if err := json.Unmarshal(rawMetaConfig, &bedrockMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Bedrock meta config: %w", err) + } + + secretAccessKey, err := s.replaceEnvValue(bedrockMetaConfig.SecretAccessKey) + if err != nil { + return nil, err + } + bedrockMetaConfig.SecretAccessKey = secretAccessKey + + if bedrockMetaConfig.Region != nil { + region, err := s.replaceEnvValue(*bedrockMetaConfig.Region) + if err != nil { + return nil, err + } + bedrockMetaConfig.Region = ®ion + } + + if bedrockMetaConfig.SessionToken != nil { + sessionToken, err := s.replaceEnvValue(*bedrockMetaConfig.SessionToken) + if err != nil { + return nil, err + } + bedrockMetaConfig.SessionToken = &sessionToken + } + + if bedrockMetaConfig.ARN != nil { + arn, err := s.replaceEnvValue(*bedrockMetaConfig.ARN) + if err != nil { + return nil, err + } + bedrockMetaConfig.ARN = &arn + } + + processedJSON, err := json.Marshal(bedrockMetaConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal processed Bedrock meta config: %w", err) + } + return processedJSON, nil + + case schemas.Vertex: + var vertexMetaConfig meta.VertexMetaConfig + if err := json.Unmarshal(rawMetaConfig, &vertexMetaConfig); err != nil { + return nil, fmt.Errorf("failed to unmarshal Vertex meta config: %w", err) + } + + projectID, err := s.replaceEnvValue(vertexMetaConfig.ProjectID) + if err != nil { + return nil, err + } + vertexMetaConfig.ProjectID = projectID + + region, err := s.replaceEnvValue(vertexMetaConfig.Region) + if err != nil { + return nil, err + } + vertexMetaConfig.Region = region + + authCredentials, err := s.replaceEnvValue(vertexMetaConfig.AuthCredentials) + if err != nil { + return nil, err + } + vertexMetaConfig.AuthCredentials = authCredentials + + processedJSON, err := json.Marshal(vertexMetaConfig) + if err != nil { + return nil, fmt.Errorf("failed to marshal processed Vertex meta config: %w", err) + } + return processedJSON, nil + } + + return rawMetaConfig, nil +} + +// GetProviderConfig retrieves a fully processed provider configuration from memory. +// This is the primary method called by the account interface and is optimized for minimal latency. +// +// Performance characteristics: +// - Memory access: ultra-fast direct memory access +// - No database I/O or JSON parsing overhead +// - Thread-safe with read locks for concurrent access +// +// Returns a copy of the configuration to prevent external modifications. +func (s *ConfigStore) GetProviderConfig(provider schemas.ModelProvider) (*ProviderConfig, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + config, exists := s.providers[provider] + if !exists { + return nil, fmt.Errorf("provider %s not found", provider) + } + + // Return a copy to prevent external modifications + configCopy := config + return &configCopy, nil +} + +// GetAllProviders returns all configured providers. +func (s *ConfigStore) GetAllProviders() ([]schemas.ModelProvider, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + providers := make([]schemas.ModelProvider, 0, len(s.providers)) + for provider := range s.providers { + providers = append(providers, provider) + } + + return providers, nil +} + +// UpdateProviderConfig updates a provider configuration in memory with full environment +// variable processing. This method is called when provider configurations are modified +// via the HTTP API and ensures all data processing is done upfront. +// +// The method: +// - Processes environment variables in API keys and meta configurations +// - Stores the processed configuration in memory +// - Updates metadata and timestamps +// - Thread-safe operation with write locks +func (s *ConfigStore) UpdateProviderConfig(provider schemas.ModelProvider, config ProviderConfig) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Process environment variables in keys + for i, key := range config.Keys { + processedValue, err := s.replaceEnvValue(key.Value) + if err != nil { + return fmt.Errorf("failed to process env var in key: %w", err) + } + config.Keys[i].Value = processedValue + } + + // Process environment variables in meta config if present + if config.MetaConfig != nil { + rawMetaData, err := json.Marshal(*config.MetaConfig) + if err != nil { + return fmt.Errorf("failed to marshal meta config: %w", err) + } + + processedMetaData, err := s.processMetaConfigEnvVars(rawMetaData, provider) + if err != nil { + return fmt.Errorf("failed to process env vars in meta config: %w", err) + } + + metaConfig, err := s.parseMetaConfig(processedMetaData, provider) + if err != nil { + return fmt.Errorf("failed to parse processed meta config: %w", err) + } + config.MetaConfig = metaConfig + } + + s.providers[provider] = config + + s.logger.Info(fmt.Sprintf("Updated configuration for provider: %s", provider)) + return nil +} + +// AddProvider adds a new provider configuration to memory with full environment variable +// processing. This method is called when new providers are added via the HTTP API. +// +// The method: +// - Validates that the provider doesn't already exist +// - Processes environment variables in API keys and meta configurations +// - Stores the processed configuration in memory +// - Updates metadata and timestamps +func (s *ConfigStore) AddProvider(provider schemas.ModelProvider, config ProviderConfig) error { + s.mu.Lock() + defer s.mu.Unlock() + + // Check if provider already exists + if _, exists := s.providers[provider]; exists { + return fmt.Errorf("provider %s already exists", provider) + } + + // Process environment variables in keys + for i, key := range config.Keys { + processedValue, err := s.replaceEnvValue(key.Value) + if err != nil { + return fmt.Errorf("failed to process env var in key: %w", err) + } + config.Keys[i].Value = processedValue + } + + // Process environment variables in meta config if present + if config.MetaConfig != nil { + rawMetaData, err := json.Marshal(*config.MetaConfig) + if err != nil { + return fmt.Errorf("failed to marshal meta config: %w", err) + } + + processedMetaData, err := s.processMetaConfigEnvVars(rawMetaData, provider) + if err != nil { + return fmt.Errorf("failed to process env vars in meta config: %w", err) + } + + metaConfig, err := s.parseMetaConfig(processedMetaData, provider) + if err != nil { + return fmt.Errorf("failed to parse processed meta config: %w", err) + } + config.MetaConfig = metaConfig + } + + s.providers[provider] = config + + s.logger.Info(fmt.Sprintf("Added provider: %s", provider)) + return nil +} + +// RemoveProvider removes a provider configuration from memory. +func (s *ConfigStore) RemoveProvider(provider schemas.ModelProvider) error { + s.mu.Lock() + defer s.mu.Unlock() + + if _, exists := s.providers[provider]; !exists { + return fmt.Errorf("provider %s not found", provider) + } + + delete(s.providers, provider) + + s.logger.Info(fmt.Sprintf("Removed provider: %s", provider)) + return nil +} + +// processMCPEnvVars processes environment variables in the MCP configuration. +// This method handles the MCP config structures and processes environment +// variables in their fields, ensuring type safety and proper field handling. +// +// Supported fields that are processed: +// - ConnectionString in each MCP ClientConfig +// +// Returns an error if any required environment variable is missing. +// This approach ensures type safety while supporting environment variable substitution. +func (s *ConfigStore) processMCPEnvVars() { + // Process each client config + for i, clientConfig := range s.mcpConfig.ClientConfigs { + // Process ConnectionString if present + if clientConfig.ConnectionString != nil { + newValue, err := s.replaceEnvValue(*clientConfig.ConnectionString) + if err != nil { + s.logger.Warn(fmt.Sprintf("failed to process env vars in MCP client %s: %v", clientConfig.Name, err)) + continue + } + s.mcpConfig.ClientConfigs[i].ConnectionString = &newValue + } + } +} + +// GetMCPConfig retrieves the processed MCP configuration from memory. +// Returns nil if no MCP configuration was loaded. +// The returned configuration has all environment variables already processed. +func (s *ConfigStore) GetMCPConfig() *schemas.MCPConfig { + s.mu.RLock() + defer s.mu.RUnlock() + + return s.mcpConfig +} diff --git a/transports/bifrost-http/main.go b/transports/bifrost-http/main.go index a9ec996116..304ee8d62c 100644 --- a/transports/bifrost-http/main.go +++ b/transports/bifrost-http/main.go @@ -1,16 +1,32 @@ // Package http provides an HTTP service using FastHTTP that exposes endpoints // for text and chat completions using various AI model providers (OpenAI, Anthropic, Bedrock, Mistral, Ollama, etc.). // -// The HTTP service provides three main endpoints: +// The HTTP service provides the following main endpoints: // - /v1/text/completions: For text completion requests // - /v1/chat/completions: For chat completion requests // - /v1/mcp/tool/execute: For MCP tool execution requests +// - /providers/*: For provider configuration management // -// Configuration is handled through a JSON config file and environment variables: +// Configuration is handled through a JSON config file, high-performance ConfigStore, and environment variables: // - Use -config flag to specify the config file location // - Use -port flag to specify the server port (default: 8080) // - Use -pool-size flag to specify the initial connection pool size (default: 300) // +// ConfigStore Features: +// - Pure in-memory storage for ultra-fast config access +// - Environment variable processing for secure configuration management +// - Real-time configuration updates via HTTP API +// - Explicit persistence control via POST /config/save endpoint +// - Provider-specific meta config support (Azure, Bedrock, Vertex) +// - Thread-safe operations with concurrent request handling +// - Statistics and monitoring endpoints for operational insights +// +// Performance Optimizations: +// - Configuration data is processed once during startup and stored in memory +// - Ultra-fast memory access eliminates I/O overhead on every request +// - All environment variable processing done upfront during configuration loading +// - Thread-safe concurrent access with read-write mutex protection +// // Example usage: // // go run main.go -config config.example.json -port 8080 -pool-size 300 @@ -28,13 +44,12 @@ // - Anthropic: POST /anthropic/v1/messages (accepts Anthropic Messages requests) // // This allows clients to use their existing integration code without modification while benefiting -// from Bifrost's unified model routing, fallbacks, and monitoring capabilities. +// from Bifrost's unified model routing, fallbacks, monitoring capabilities, and high-performance configuration management. // // NOTE: Streaming is not supported yet so all the flags related to streaming are ignored. (in both bifrost and its integrations) package main import ( - "encoding/json" "flag" "fmt" "log" @@ -45,11 +60,7 @@ import ( bifrost "github.com/maximhq/bifrost/core" schemas "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/plugins/maxim" - "github.com/maximhq/bifrost/transports/bifrost-http/integrations" - "github.com/maximhq/bifrost/transports/bifrost-http/integrations/anthropic" - "github.com/maximhq/bifrost/transports/bifrost-http/integrations/genai" - "github.com/maximhq/bifrost/transports/bifrost-http/integrations/litellm" - "github.com/maximhq/bifrost/transports/bifrost-http/integrations/openai" + "github.com/maximhq/bifrost/transports/bifrost-http/handlers" "github.com/maximhq/bifrost/transports/bifrost-http/lib" "github.com/maximhq/bifrost/transports/bifrost-http/tracking" "github.com/prometheus/client_golang/prometheus" @@ -105,17 +116,6 @@ func init() { } } -// CompletionRequest represents a request for either text or chat completion. -// It includes all necessary fields for both types of completions. -type CompletionRequest struct { - Provider schemas.ModelProvider `json:"provider"` // The AI model provider to use - Messages []schemas.BifrostMessage `json:"messages"` // Chat messages (for chat completion) - Text string `json:"text"` // Text input (for text completion) - Model string `json:"model"` // Model to use - Params *schemas.ModelParameters `json:"params"` // Additional model parameters - Fallbacks []schemas.Fallback `json:"fallbacks"` // Fallback providers and models -} - // registerCollectorSafely attempts to register a Prometheus collector, // handling the case where it may already be registered. // It logs any errors that occur during registration, except for AlreadyRegisteredError. @@ -148,17 +148,28 @@ func main() { log.Println("Prometheus Go/Process collectors registered.") - config := lib.ReadConfig(configPath) - account := &lib.BaseAccount{Config: config.ProviderConfig} + logger := bifrost.NewDefaultLogger(schemas.LogLevelInfo) - if err := account.ReadKeys(); err != nil { - log.Printf("warning: failed to read environment variables: %v", err) + // Initialize high-performance configuration store with caching + store, err := lib.NewConfigStore(logger) + if err != nil { + log.Fatalf("failed to initialize config store: %v", err) } - if err := config.ReadMCPKeys(); err != nil { - log.Printf("warning: failed to read MCP environment variables: %v", err) + // Load configuration from JSON file into the store with full preprocessing + // This processes environment variables and stores all configurations in memory for ultra-fast access + if err := store.LoadFromConfig(configPath); err != nil { + log.Fatalf("failed to load config into store: %v", err) } + // Create account backed by the high-performance store (all processing is done in LoadFromConfig) + // The account interface now benefits from ultra-fast config access times via in-memory storage + account := lib.NewBaseAccount(store) + + // Get the processed MCP configuration from the store + // All environment variable processing is already done during LoadFromConfig + mcpConfig := store.GetMCPConfig() + loadedPlugins := []schemas.Plugin{} for _, plugin := range pluginsToLoad { @@ -191,44 +202,32 @@ func main() { InitialPoolSize: initialPoolSize, DropExcessRequests: dropExcessRequests, Plugins: loadedPlugins, - MCPConfig: config.MCPConfig, + MCPConfig: mcpConfig, + Logger: logger, }) if err != nil { log.Fatalf("failed to initialize bifrost: %v", err) } - r := router.New() - - extensions := []integrations.ExtensionRouter{ - genai.NewGenAIRouter(client), - openai.NewOpenAIRouter(client), - anthropic.NewAnthropicRouter(client), - litellm.NewLiteLLMRouter(client), - } - - r.POST("/v1/text/completions", func(ctx *fasthttp.RequestCtx) { - handleCompletion(ctx, client, false) - }) - - r.POST("/v1/chat/completions", func(ctx *fasthttp.RequestCtx) { - handleCompletion(ctx, client, true) - }) + // Initialize handlers + providerHandler := handlers.NewProviderHandler(store, client, logger) + completionHandler := handlers.NewCompletionHandler(client, logger) + mcpHandler := handlers.NewMCPHandler(client, logger) + integrationHandler := handlers.NewIntegrationHandler(client) - r.POST("/v1/mcp/tool/execute", func(ctx *fasthttp.RequestCtx) { - handleMCPToolExecution(ctx, client) - }) + r := router.New() - for _, extension := range extensions { - extension.RegisterRoutes(r) - } + // Register all handler routes + providerHandler.RegisterRoutes(r) + completionHandler.RegisterRoutes(r) + mcpHandler.RegisterRoutes(r) + integrationHandler.RegisterRoutes(r) // Add Prometheus /metrics endpoint r.GET("/metrics", fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler())) r.NotFound = func(ctx *fasthttp.RequestCtx) { - ctx.SetStatusCode(fasthttp.StatusNotFound) - ctx.SetContentType("text/plain") - ctx.SetBodyString("Route not found: " + string(ctx.Path())) + handlers.SendError(ctx, fasthttp.StatusNotFound, "Route not found: "+string(ctx.Path()), logger) } server := &fasthttp.Server{ @@ -249,126 +248,3 @@ func main() { client.Cleanup() } - -// handleCompletion processes both text and chat completion requests. -// It handles request parsing, validation, and response formatting. -// -// Parameters: -// - ctx: The FastHTTP request context -// - client: The Bifrost client instance -// - isChat: Whether this is a chat completion request (true) or text completion (false) -// -// The function: -// 1. Parses the request body into a CompletionRequest -// 2. Validates required fields based on the request type -// 3. Creates a BifrostRequest with the appropriate input type -// 4. Calls the appropriate completion method on the client -// 5. Handles any errors and formats the response -func handleCompletion(ctx *fasthttp.RequestCtx, client *bifrost.Bifrost, isChat bool) { - var req CompletionRequest - if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { - ctx.SetStatusCode(fasthttp.StatusBadRequest) - ctx.SetBodyString(fmt.Sprintf("invalid request format: %v", err)) - return - } - - if req.Provider == "" { - ctx.SetStatusCode(fasthttp.StatusBadRequest) - ctx.SetBodyString("Provider is required") - return - } - - bifrostReq := &schemas.BifrostRequest{ - Provider: req.Provider, - Model: req.Model, - Params: req.Params, - Fallbacks: req.Fallbacks, - } - - if isChat { - if len(req.Messages) == 0 { - ctx.SetStatusCode(fasthttp.StatusBadRequest) - ctx.SetBodyString("Messages array is required") - return - } - bifrostReq.Input = schemas.RequestInput{ - ChatCompletionInput: &req.Messages, - } - } else { - if req.Text == "" { - ctx.SetStatusCode(fasthttp.StatusBadRequest) - ctx.SetBodyString("Text is required") - return - } - bifrostReq.Input = schemas.RequestInput{ - TextCompletionInput: &req.Text, - } - } - - bifrostCtx := lib.ConvertToBifrostContext(ctx) - - var resp *schemas.BifrostResponse - var bifrostErr *schemas.BifrostError - - if bifrostCtx == nil { - ctx.SetStatusCode(fasthttp.StatusInternalServerError) - ctx.SetBodyString("Failed to convert context") - return - } - - if isChat { - resp, bifrostErr = client.ChatCompletionRequest(*bifrostCtx, bifrostReq) - } else { - resp, bifrostErr = client.TextCompletionRequest(*bifrostCtx, bifrostReq) - } - - if bifrostErr != nil { - handleBifrostError(ctx, bifrostErr) - return - } - - ctx.SetStatusCode(fasthttp.StatusOK) - ctx.SetContentType("application/json") - if encodeErr := json.NewEncoder(ctx).Encode(resp); encodeErr != nil { - ctx.SetStatusCode(fasthttp.StatusInternalServerError) - ctx.SetBodyString(fmt.Sprintf("failed to encode response: %v", encodeErr)) - } -} - -func handleMCPToolExecution(ctx *fasthttp.RequestCtx, client *bifrost.Bifrost) { - var req schemas.ToolCall - if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { - ctx.SetStatusCode(fasthttp.StatusBadRequest) - ctx.SetBodyString(fmt.Sprintf("invalid request format: %v", err)) - return - } - - bifrostCtx := lib.ConvertToBifrostContext(ctx) - - resp, bifrostErr := client.ExecuteMCPTool(*bifrostCtx, req) - if bifrostErr != nil { - handleBifrostError(ctx, bifrostErr) - return - } - - ctx.SetStatusCode(fasthttp.StatusOK) - ctx.SetContentType("application/json") - if encodeErr := json.NewEncoder(ctx).Encode(resp); encodeErr != nil { - ctx.SetStatusCode(fasthttp.StatusInternalServerError) - ctx.SetBodyString(fmt.Sprintf("failed to encode response: %v", encodeErr)) - } -} - -func handleBifrostError(ctx *fasthttp.RequestCtx, bifrostErr *schemas.BifrostError) { - if bifrostErr.StatusCode != nil { - ctx.SetStatusCode(*bifrostErr.StatusCode) - } else { - ctx.SetStatusCode(fasthttp.StatusInternalServerError) - } - - ctx.SetContentType("application/json") - if encodeErr := json.NewEncoder(ctx).Encode(bifrostErr); encodeErr != nil { - ctx.SetStatusCode(fasthttp.StatusInternalServerError) - ctx.SetBodyString(fmt.Sprintf("failed to encode error response: %v", encodeErr)) - } -} diff --git a/transports/go.mod b/transports/go.mod index 6beae458f8..90b1a2245d 100644 --- a/transports/go.mod +++ b/transports/go.mod @@ -11,6 +11,8 @@ require ( google.golang.org/genai v1.4.0 ) +replace github.com/maximhq/bifrost/core => ../core + require ( cloud.google.com/go v0.121.0 // indirect cloud.google.com/go/auth v0.16.0 // indirect diff --git a/transports/go.sum b/transports/go.sum index 8bfd634261..a933cab410 100644 --- a/transports/go.sum +++ b/transports/go.sum @@ -75,8 +75,6 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/mark3labs/mcp-go v0.32.0 h1:fgwmbfL2gbd67obg57OfV2Dnrhs1HtSdlY/i5fn7MU8= github.com/mark3labs/mcp-go v0.32.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4= -github.com/maximhq/bifrost/core v1.1.6 h1:rZrfPVcAfNggfBaOTdu/w+xNwDhW79bfexXsw8LRoMQ= -github.com/maximhq/bifrost/core v1.1.6/go.mod h1:yMRCncTgKYBIrECSRVxMbY3BL8CjLbipJlc644jryxc= github.com/maximhq/bifrost/plugins/maxim v1.0.6 h1:m1tWjbmxW9Lz4mDhXclQhZdFt/TrRPbZwFcoWY9ZAEk= github.com/maximhq/bifrost/plugins/maxim v1.0.6/go.mod h1:+D/E498VB4JNTEzG4fYyFJf9WQaq/9FgYrmzl49mLNc= github.com/maximhq/maxim-go v0.1.3 h1:nVzdz3hEjZVxmWHARWIM+Yrn1Jp50qrsK4BA/sz2jj8=