From f4f365f43e83edbb40005f0b211a80cc9cd9b92e Mon Sep 17 00:00:00 2001 From: Pratham Mishra <99235987+Pratham-Mishra04@users.noreply.github.com> Date: Wed, 9 Jul 2025 01:25:28 +0530 Subject: [PATCH] fix: onsave keys redact, logging and added configurable logging queue size --- .github/workflows/transport-ci.yml | 2 +- core/bifrost.go | 7 +- docs/benchmarks.md | 2 + transports/bifrost-http/handlers/config.go | 18 +- transports/bifrost-http/handlers/logging.go | 9 +- transports/bifrost-http/handlers/mcp.go | 28 +- transports/bifrost-http/handlers/providers.go | 28 +- transports/bifrost-http/lib/config.go | 7 +- transports/bifrost-http/lib/store.go | 204 ++++++++++-- transports/bifrost-http/main.go | 2 +- .../bifrost-http/plugins/logging/main.go | 54 +-- .../bifrost-http/plugins/logging/utils.go | 308 ++++++++++++++---- transports/bifrost-http/ui/404.html | 4 +- transports/bifrost-http/ui/404/index.html | 4 +- .../ui/_next/static/build/_buildManifest.js | 2 +- .../static/chunks/154-7b9dae231ea16969.js | 124 ------- .../static/chunks/213-672494f56acc68a6.js | 1 - ...24d72c89eac.js => 272-ea143f89da3f8b1f.js} | 4 +- .../static/chunks/273-9756261fec6bc01b.js | 1 + .../static/chunks/293-d2734c4726406a0e.js | 1 - .../static/chunks/341-3971b040aed697e5.js | 1 + .../static/chunks/341-5aa9f869e119bce4.js | 1 - .../static/chunks/473-3e3a48663e3561da.js | 1 - .../static/chunks/529-26467b76604e8781.js | 1 + .../static/chunks/678-56244c2aeff7b5e2.js | 1 + .../static/chunks/825-aee0522b5fc044c3.js | 124 +++++++ .../app/config/page-6aaabc7109379e54.js | 1 + .../app/config/page-bf3c65256b3fc98b.js | 1 - .../chunks/app/layout-6acb57196bba0407.js | 1 + .../chunks/app/layout-e528237e8bb14184.js | 1 - .../chunks/app/page-3d9741bf5c7d7c6d.js | 1 + .../chunks/app/page-9c52c1dd03452de9.js | 1 - .../ui/_next/static/css/0290f827d14417a0.css | 1 + .../ui/_next/static/css/0451d03ec40c67fa.css | 1 - .../bifrost-http/ui/bifrost-logo-dark.png | Bin 0 -> 226623 bytes transports/bifrost-http/ui/bifrost-logo.png | Bin 0 -> 39354 bytes transports/bifrost-http/ui/config/index.html | 4 +- transports/bifrost-http/ui/config/index.txt | 18 +- transports/bifrost-http/ui/docs/index.html | 4 +- transports/bifrost-http/ui/docs/index.txt | 16 +- transports/bifrost-http/ui/favicon.ico | Bin 0 -> 15406 bytes transports/bifrost-http/ui/index.html | 4 +- transports/bifrost-http/ui/index.txt | 18 +- transports/bifrost-http/ui/plugins/index.html | 4 +- transports/bifrost-http/ui/plugins/index.txt | 16 +- transports/go.mod | 2 - ui/app/config/page.tsx | 42 +-- ui/app/docs/page.tsx | 2 +- ui/app/favicon.ico | Bin 0 -> 15406 bytes ui/app/layout.tsx | 6 +- ui/app/plugins/page.tsx | 4 +- ui/components/config/core-settings-list.tsx | 82 ++++- ui/components/config/providers-list.tsx | 2 +- ui/components/logs/empty-state.tsx | 3 +- ui/components/sidebar.tsx | 22 +- ui/lib/api.ts | 17 +- ui/lib/constants/icons.tsx | 30 -- ui/lib/types/config.ts | 1 + ui/public/bifrost-logo-dark.png | Bin 0 -> 226623 bytes ui/public/bifrost-logo.png | Bin 0 -> 39354 bytes 60 files changed, 827 insertions(+), 417 deletions(-) delete mode 100644 transports/bifrost-http/ui/_next/static/chunks/154-7b9dae231ea16969.js delete mode 100644 transports/bifrost-http/ui/_next/static/chunks/213-672494f56acc68a6.js rename transports/bifrost-http/ui/_next/static/chunks/{443-5702d24d72c89eac.js => 272-ea143f89da3f8b1f.js} (75%) create mode 100644 transports/bifrost-http/ui/_next/static/chunks/273-9756261fec6bc01b.js delete mode 100644 transports/bifrost-http/ui/_next/static/chunks/293-d2734c4726406a0e.js create mode 100644 transports/bifrost-http/ui/_next/static/chunks/341-3971b040aed697e5.js delete mode 100644 transports/bifrost-http/ui/_next/static/chunks/341-5aa9f869e119bce4.js delete mode 100644 transports/bifrost-http/ui/_next/static/chunks/473-3e3a48663e3561da.js create mode 100644 transports/bifrost-http/ui/_next/static/chunks/529-26467b76604e8781.js create mode 100644 transports/bifrost-http/ui/_next/static/chunks/678-56244c2aeff7b5e2.js create mode 100644 transports/bifrost-http/ui/_next/static/chunks/825-aee0522b5fc044c3.js create mode 100644 transports/bifrost-http/ui/_next/static/chunks/app/config/page-6aaabc7109379e54.js delete mode 100644 transports/bifrost-http/ui/_next/static/chunks/app/config/page-bf3c65256b3fc98b.js create mode 100644 transports/bifrost-http/ui/_next/static/chunks/app/layout-6acb57196bba0407.js delete mode 100644 transports/bifrost-http/ui/_next/static/chunks/app/layout-e528237e8bb14184.js create mode 100644 transports/bifrost-http/ui/_next/static/chunks/app/page-3d9741bf5c7d7c6d.js delete mode 100644 transports/bifrost-http/ui/_next/static/chunks/app/page-9c52c1dd03452de9.js create mode 100644 transports/bifrost-http/ui/_next/static/css/0290f827d14417a0.css delete mode 100644 transports/bifrost-http/ui/_next/static/css/0451d03ec40c67fa.css create mode 100644 transports/bifrost-http/ui/bifrost-logo-dark.png create mode 100644 transports/bifrost-http/ui/bifrost-logo.png create mode 100644 transports/bifrost-http/ui/favicon.ico create mode 100644 ui/app/favicon.ico create mode 100644 ui/public/bifrost-logo-dark.png create mode 100644 ui/public/bifrost-logo.png diff --git a/.github/workflows/transport-ci.yml b/.github/workflows/transport-ci.yml index 6bf4548db8..57ad872b07 100644 --- a/.github/workflows/transport-ci.yml +++ b/.github/workflows/transport-ci.yml @@ -223,7 +223,7 @@ jobs: - name: Commit and push UI changes run: | - git add . + git add transports/ui if git diff --staged --quiet; then echo "No changes to commit. UI build is already up to date." else diff --git a/core/bifrost.go b/core/bifrost.go index be04b2c5ef..9d96a04eb7 100644 --- a/core/bifrost.go +++ b/core/bifrost.go @@ -568,7 +568,12 @@ func (bifrost *Bifrost) GetMCPClients() ([]schemas.MCPClient, error) { // }) func (bifrost *Bifrost) AddMCPClient(config schemas.MCPClientConfig) error { if bifrost.mcpManager == nil { - return fmt.Errorf("MCP is not configured in this Bifrost instance") + manager := &MCPManager{ + clientMap: make(map[string]*MCPClient), + logger: bifrost.logger, + } + + bifrost.mcpManager = manager } return bifrost.mcpManager.AddClient(config) diff --git a/docs/benchmarks.md b/docs/benchmarks.md index 92d8b78120..f90587654d 100644 --- a/docs/benchmarks.md +++ b/docs/benchmarks.md @@ -43,6 +43,8 @@ _\*Bifrost's overhead is measured at 59 µs on t3.medium and 11 µs on t3.xlarge **Note**: On the t3.xlarge, we tested with significantly larger response payloads (~10 KB average vs ~1 KB on t3.medium). Even so, response parsing time dropped dramatically thanks to better CPU throughput and Bifrost's optimized memory reuse. +**Disclaimer**: These metrics are measured without the UI enabled. When using the UI, there is no drop in performance - only memory usage increases due to the additional UI build being served. + --- ## 🎯 Key Performance Highlights diff --git a/transports/bifrost-http/handlers/config.go b/transports/bifrost-http/handlers/config.go index e82a9d3c8d..2e4a143821 100644 --- a/transports/bifrost-http/handlers/config.go +++ b/transports/bifrost-http/handlers/config.go @@ -37,7 +37,6 @@ func NewConfigHandler(client *bifrost.Bifrost, logger schemas.Logger, store *lib func (h *ConfigHandler) RegisterRoutes(r *router.Router) { r.GET("/api/config", h.GetConfig) r.PUT("/api/config", h.handleUpdateConfig) - r.POST("/api/config/save", h.SaveConfig) } // GetConfig handles GET /config - Get the current configuration @@ -74,27 +73,22 @@ func (h *ConfigHandler) handleUpdateConfig(ctx *fasthttp.RequestCtx) { updatedConfig.InitialPoolSize = req.InitialPoolSize } + if req.LogQueueSize != currentConfig.LogQueueSize { + updatedConfig.LogQueueSize = req.LogQueueSize + } + // Update the store with the new config h.store.ClientConfig = updatedConfig - ctx.SetStatusCode(fasthttp.StatusOK) - SendJSON(ctx, map[string]any{ - "status": "success", - "message": "Configuration updated successfully", - }, h.logger) -} - -// SaveConfig handles POST /config/save - Persist current configuration to JSON file -func (h *ConfigHandler) SaveConfig(ctx *fasthttp.RequestCtx) { - // Save current configuration back to the original JSON file if err := h.store.SaveConfig(); err != nil { h.logger.Warn(fmt.Sprintf("Failed to save configuration: %v", err)) SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to save configuration: %v", err), h.logger) return } + ctx.SetStatusCode(fasthttp.StatusOK) SendJSON(ctx, map[string]any{ "status": "success", - "message": "Configuration saved successfully", + "message": "Configuration updated successfully", }, h.logger) } diff --git a/transports/bifrost-http/handlers/logging.go b/transports/bifrost-http/handlers/logging.go index 6baa429e70..5fd01efc6e 100644 --- a/transports/bifrost-http/handlers/logging.go +++ b/transports/bifrost-http/handlers/logging.go @@ -32,9 +32,10 @@ func NewLoggingHandler(logManager logging.LogManager, logger schemas.Logger) *Lo func (h *LoggingHandler) RegisterRoutes(r *router.Router) { // Log retrieval with filtering, search, and pagination r.GET("/api/logs", h.GetLogs) + r.GET("/api/logs/dropped", h.GetDroppedRequests) } -// GetLogs handles GET /v1/logs - Get logs with filtering, search, and pagination via query parameters +// GetLogs handles GET /api/logs - Get logs with filtering, search, and pagination via query parameters func (h *LoggingHandler) GetLogs(ctx *fasthttp.RequestCtx) { // Parse query parameters into filters filters := &logging.SearchFilters{} @@ -139,6 +140,12 @@ func (h *LoggingHandler) GetLogs(ctx *fasthttp.RequestCtx) { SendJSON(ctx, result, h.logger) } +// GetDroppedRequests handles GET /api/logs/dropped - Get the number of dropped requests +func (h *LoggingHandler) GetDroppedRequests(ctx *fasthttp.RequestCtx) { + droppedRequests := h.logManager.GetDroppedRequests() + SendJSON(ctx, map[string]int64{"dropped_requests": droppedRequests}, h.logger) +} + // Helper functions // parseCommaSeparated splits a comma-separated string into a slice diff --git a/transports/bifrost-http/handlers/mcp.go b/transports/bifrost-http/handlers/mcp.go index 556c28eb21..9e40bc2a9b 100644 --- a/transports/bifrost-http/handlers/mcp.go +++ b/transports/bifrost-http/handlers/mcp.go @@ -72,7 +72,7 @@ func (h *MCPHandler) ExecuteTool(ctx *fasthttp.RequestCtx) { SendJSON(ctx, resp, h.logger) } -// GetMCPClients handles GET /mcp/clients - Get all MCP clients +// GetMCPClients handles GET /api/mcp/clients - Get all MCP clients func (h *MCPHandler) GetMCPClients(ctx *fasthttp.RequestCtx) { // Get clients from store config configsInStore := h.store.MCPConfig @@ -120,7 +120,7 @@ func (h *MCPHandler) GetMCPClients(ctx *fasthttp.RequestCtx) { SendJSON(ctx, clients, h.logger) } -// ReconnectMCPClient handles POST /mcp/client/{name}/reconnect - Reconnect an MCP client +// ReconnectMCPClient handles POST /api/mcp/client/{name}/reconnect - Reconnect an MCP client func (h *MCPHandler) ReconnectMCPClient(ctx *fasthttp.RequestCtx) { name, err := getNameFromCtx(ctx) if err != nil { @@ -139,7 +139,7 @@ func (h *MCPHandler) ReconnectMCPClient(ctx *fasthttp.RequestCtx) { }, h.logger) } -// AddMCPClient handles POST /mcp/client - Add a new MCP client +// AddMCPClient handles POST /api/mcp/client - Add a new MCP client func (h *MCPHandler) AddMCPClient(ctx *fasthttp.RequestCtx) { var req schemas.MCPClientConfig if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { @@ -152,13 +152,19 @@ func (h *MCPHandler) AddMCPClient(ctx *fasthttp.RequestCtx) { return } + if err := h.store.SaveConfig(); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to save configuration: %v", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to save configuration: %v", err), h.logger) + return + } + SendJSON(ctx, map[string]any{ "status": "success", "message": "MCP client added successfully", }, h.logger) } -// EditMCPClientTools handles PUT /mcp/client/{name} - Edit MCP client tools +// EditMCPClientTools handles PUT /api/mcp/client/{name} - Edit MCP client tools func (h *MCPHandler) EditMCPClientTools(ctx *fasthttp.RequestCtx) { name, err := getNameFromCtx(ctx) if err != nil { @@ -180,13 +186,19 @@ func (h *MCPHandler) EditMCPClientTools(ctx *fasthttp.RequestCtx) { return } + if err := h.store.SaveConfig(); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to save configuration: %v", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to save configuration: %v", err), h.logger) + return + } + SendJSON(ctx, map[string]any{ "status": "success", "message": "MCP client tools edited successfully", }, h.logger) } -// RemoveMCPClient handles DELETE /mcp/client/{name} - Remove an MCP client +// RemoveMCPClient handles DELETE /api/mcp/client/{name} - Remove an MCP client func (h *MCPHandler) RemoveMCPClient(ctx *fasthttp.RequestCtx) { name, err := getNameFromCtx(ctx) if err != nil { @@ -199,6 +211,12 @@ func (h *MCPHandler) RemoveMCPClient(ctx *fasthttp.RequestCtx) { return } + if err := h.store.SaveConfig(); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to save configuration: %v", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to save configuration: %v", err), h.logger) + return + } + SendJSON(ctx, map[string]any{ "status": "success", "message": "MCP client removed successfully", diff --git a/transports/bifrost-http/handlers/providers.go b/transports/bifrost-http/handlers/providers.go index 5684de5aa6..0dd5afd964 100644 --- a/transports/bifrost-http/handlers/providers.go +++ b/transports/bifrost-http/handlers/providers.go @@ -82,7 +82,7 @@ func (h *ProviderHandler) RegisterRoutes(r *router.Router) { r.DELETE("/api/providers/{provider}", h.DeleteProvider) } -// ListProviders handles GET /providers - List all providers +// ListProviders handles GET /api/providers - List all providers func (h *ProviderHandler) ListProviders(ctx *fasthttp.RequestCtx) { providers, err := h.store.GetAllProviders() if err != nil { @@ -119,7 +119,7 @@ func (h *ProviderHandler) ListProviders(ctx *fasthttp.RequestCtx) { SendJSON(ctx, response, h.logger) } -// GetProvider handles GET /providers/{provider} - Get specific provider +// GetProvider handles GET /api/providers/{provider} - Get specific provider func (h *ProviderHandler) GetProvider(ctx *fasthttp.RequestCtx) { provider, err := getProviderFromCtx(ctx) if err != nil { @@ -138,7 +138,7 @@ func (h *ProviderHandler) GetProvider(ctx *fasthttp.RequestCtx) { SendJSON(ctx, response, h.logger) } -// AddProvider handles POST /providers - Add a new provider +// AddProvider handles POST /api/providers - Add a new provider func (h *ProviderHandler) AddProvider(ctx *fasthttp.RequestCtx) { var req AddProviderRequest if err := json.Unmarshal(ctx.PostBody(), &req); err != nil { @@ -200,6 +200,12 @@ func (h *ProviderHandler) AddProvider(ctx *fasthttp.RequestCtx) { return } + if err := h.store.SaveConfig(); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to save configuration: %v", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to save configuration: %v", err), h.logger) + return + } + h.logger.Info(fmt.Sprintf("Provider %s added successfully", req.Provider)) response := h.getProviderResponseFromConfig(req.Provider, config) @@ -207,7 +213,7 @@ func (h *ProviderHandler) AddProvider(ctx *fasthttp.RequestCtx) { SendJSON(ctx, response, h.logger) } -// UpdateProvider handles PUT /providers/{provider} - Update provider config +// UpdateProvider handles PUT /api/providers/{provider} - Update provider config // NOTE: This endpoint expects ALL fields to be provided in the request body, // including both edited and non-edited fields. Partial updates are not supported. // The frontend should send the complete provider configuration. @@ -340,6 +346,12 @@ func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) { return } + if err := h.store.SaveConfig(); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to save configuration: %v", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to save configuration: %v", err), h.logger) + return + } + if config.ConcurrencyAndBufferSize.Concurrency != oldConfigRaw.ConcurrencyAndBufferSize.Concurrency || config.ConcurrencyAndBufferSize.BufferSize != oldConfigRaw.ConcurrencyAndBufferSize.BufferSize { // Update concurrency and queue configuration in Bifrost @@ -354,7 +366,7 @@ func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) { SendJSON(ctx, response, h.logger) } -// DeleteProvider handles DELETE /providers/{provider} - Remove provider +// DeleteProvider handles DELETE /api/providers/{provider} - Remove provider func (h *ProviderHandler) DeleteProvider(ctx *fasthttp.RequestCtx) { provider, err := getProviderFromCtx(ctx) if err != nil { @@ -375,6 +387,12 @@ func (h *ProviderHandler) DeleteProvider(ctx *fasthttp.RequestCtx) { return } + if err := h.store.SaveConfig(); err != nil { + h.logger.Warn(fmt.Sprintf("Failed to save configuration: %v", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Failed to save configuration: %v", err), h.logger) + return + } + h.logger.Info(fmt.Sprintf("Provider %s removed successfully", provider)) response := ProviderResponse{ diff --git a/transports/bifrost-http/lib/config.go b/transports/bifrost-http/lib/config.go index b441618d38..31dd113ba5 100644 --- a/transports/bifrost-http/lib/config.go +++ b/transports/bifrost-http/lib/config.go @@ -9,9 +9,10 @@ import ( // ClientConfig represents the core configuration for Bifrost HTTP transport and the Bifrost Client. // It includes settings for excess request handling, Prometheus metrics, and initial pool size. type ClientConfig struct { - DropExcessRequests bool `json:"drop_excess_requests"` - InitialPoolSize int `json:"initial_pool_size"` - PrometheusLabels []string `json:"prometheus_labels"` + DropExcessRequests bool `json:"drop_excess_requests"` // Drop excess requests if the provider queue is full + InitialPoolSize int `json:"initial_pool_size"` // The initial pool size for the bifrost client + PrometheusLabels []string `json:"prometheus_labels"` // The labels to be used for prometheus metrics + LogQueueSize int `json:"log_queue_size"` // The size of the log queue, additional requests will be dropped (not saved for ui) if the queue is full } // ProviderConfig represents the configuration for a specific AI model provider. diff --git a/transports/bifrost-http/lib/store.go b/transports/bifrost-http/lib/store.go index dda9dda50a..4d2b7af6de 100644 --- a/transports/bifrost-http/lib/store.go +++ b/transports/bifrost-http/lib/store.go @@ -49,6 +49,13 @@ type EnvKeyInfo struct { ConfigPath string // Path in config where this env var is used } +var DefaultClientConfig = ClientConfig{ + DropExcessRequests: false, + PrometheusLabels: []string{}, + InitialPoolSize: 300, + LogQueueSize: 1000, +} + // NewConfigStore creates a new in-memory configuration store instance. func NewConfigStore(logger schemas.Logger) (*ConfigStore, error) { return &ConfigStore{ @@ -83,14 +90,10 @@ func (s *ConfigStore) LoadFromConfig(configPath string) error { data, err := os.ReadFile(configPath) if err != nil { if os.IsNotExist(err) { - s.logger.Info(fmt.Sprintf("Config file %s not found, starting with default configuration. Providers can be added dynamically via API.", configPath)) + s.logger.Info(fmt.Sprintf("Config file %s not found, starting with default configuration. Providers can be added dynamically via UI.", configPath)) // Initialize with default configuration - s.ClientConfig = ClientConfig{ - DropExcessRequests: false, - PrometheusLabels: []string{}, - InitialPoolSize: 300, - } + s.ClientConfig = DefaultClientConfig s.Providers = make(map[schemas.ModelProvider]ProviderConfig) s.MCPConfig = nil @@ -122,12 +125,7 @@ func (s *ConfigStore) LoadFromConfig(configPath string) error { } s.ClientConfig = clientConfig } else { - // Default client configuration - s.ClientConfig = ClientConfig{ - DropExcessRequests: false, - PrometheusLabels: []string{}, - InitialPoolSize: 300, - } + s.ClientConfig = DefaultClientConfig } // Process provider configurations @@ -246,15 +244,23 @@ func (s *ConfigStore) processEnvValue(value string) (string, string, error) { return value, "", nil } -// WriteConfigToFile writes the current in-memory configuration back to a JSON file +// writeConfigToFile writes the current in-memory configuration back to a JSON file // in the exact same format that LoadFromConfig expects. This enables persistence -// of runtime configuration changes. -func (s *ConfigStore) WriteConfigToFile(configPath string) error { +// of runtime configuration changes with environment variable references restored. +func (s *ConfigStore) writeConfigToFile(configPath string) error { s.mu.RLock() defer s.mu.RUnlock() s.logger.Info(fmt.Sprintf("Writing current configuration to: %s", configPath)) + // Create a map for quick lookup of env vars by provider and path + envVarsByPath := make(map[string]string) + for envVar, infos := range s.EnvKeys { + for _, info := range infos { + envVarsByPath[info.ConfigPath] = envVar + } + } + // Prepare the output structure output := struct { Providers map[string]interface{} `json:"providers"` @@ -262,17 +268,33 @@ func (s *ConfigStore) WriteConfigToFile(configPath string) error { Client ClientConfig `json:"client,omitempty"` }{ Providers: make(map[string]interface{}), - MCP: s.MCPConfig, + MCP: s.getRestoredMCPConfig(envVarsByPath), Client: s.ClientConfig, } - // Convert providers back to the original format + // Convert providers back to the original format with env variable restoration for provider, config := range s.Providers { providerName := string(provider) - // Create provider config without processed values (keep env.* references) + // Create redacted keys that restore env.* references + redactedKeys := make([]schemas.Key, len(config.Keys)) + for i, key := range config.Keys { + redactedKeys[i] = schemas.Key{ + Models: key.Models, + Weight: key.Weight, + } + + path := fmt.Sprintf("providers.%s.keys[%d]", provider, i) + if envVar, ok := envVarsByPath[path]; ok { + redactedKeys[i].Value = "env." + envVar + } else { + redactedKeys[i].Value = key.Value // Keep actual value, no asterisk redaction + } + } + + // Create provider config with restored env references providerConfig := map[string]interface{}{ - "keys": config.Keys, // Note: This will contain actual values, not env refs + "keys": redactedKeys, } if config.NetworkConfig != nil { @@ -283,8 +305,10 @@ func (s *ConfigStore) WriteConfigToFile(configPath string) error { providerConfig["concurrency_and_buffer_size"] = config.ConcurrencyAndBufferSize } + // Handle meta config with env variable restoration if config.MetaConfig != nil { - providerConfig["meta_config"] = *config.MetaConfig + restoredMetaConfig := s.restoreMetaConfigEnvVars(provider, *config.MetaConfig, envVarsByPath) + providerConfig["meta_config"] = restoredMetaConfig } output.Providers[providerName] = providerConfig @@ -305,12 +329,148 @@ func (s *ConfigStore) WriteConfigToFile(configPath string) error { return nil } +// getRestoredMCPConfig creates a copy of MCP config with env variable references restored +func (s *ConfigStore) getRestoredMCPConfig(envVarsByPath map[string]string) *schemas.MCPConfig { + if s.MCPConfig == nil { + return nil + } + + // Create a copy of the MCP config + mcpConfigCopy := &schemas.MCPConfig{ + ClientConfigs: make([]schemas.MCPClientConfig, len(s.MCPConfig.ClientConfigs)), + } + + // Process each client config + for i, clientConfig := range s.MCPConfig.ClientConfigs { + configCopy := schemas.MCPClientConfig{ + Name: clientConfig.Name, + ConnectionType: clientConfig.ConnectionType, + StdioConfig: clientConfig.StdioConfig, + ToolsToExecute: append([]string{}, clientConfig.ToolsToExecute...), + ToolsToSkip: append([]string{}, clientConfig.ToolsToSkip...), + } + + // Handle connection string with env variable restoration + if clientConfig.ConnectionString != nil { + connStr := *clientConfig.ConnectionString + path := fmt.Sprintf("mcp.client_configs[%d].connection_string", i) + if envVar, ok := envVarsByPath[path]; ok { + connStr = "env." + envVar + } + // If not from env var, keep actual value (no asterisk redaction) + configCopy.ConnectionString = &connStr + } + + mcpConfigCopy.ClientConfigs[i] = configCopy + } + + return mcpConfigCopy +} + +// restoreMetaConfigEnvVars creates a copy of meta config with env variable references restored +func (s *ConfigStore) restoreMetaConfigEnvVars(provider schemas.ModelProvider, metaConfig schemas.MetaConfig, envVarsByPath map[string]string) interface{} { + switch m := metaConfig.(type) { + case *meta.AzureMetaConfig: + azureConfig := *m // Copy the struct + + // Restore endpoint if it came from env var + path := fmt.Sprintf("providers.%s.meta_config.endpoint", provider) + if envVar, ok := envVarsByPath[path]; ok { + azureConfig.Endpoint = "env." + envVar + } + // Otherwise keep actual value (no asterisk redaction) + + // Restore API version if it came from env var + if azureConfig.APIVersion != nil { + path = fmt.Sprintf("providers.%s.meta_config.api_version", provider) + if envVar, ok := envVarsByPath[path]; ok { + apiVersion := "env." + envVar + azureConfig.APIVersion = &apiVersion + } + // Otherwise keep actual value (no asterisk redaction) + } + + return azureConfig + + case *meta.BedrockMetaConfig: + bedrockConfig := *m // Copy the struct + + // Restore secret access key if it came from env var + path := fmt.Sprintf("providers.%s.meta_config.secret_access_key", provider) + if envVar, ok := envVarsByPath[path]; ok { + bedrockConfig.SecretAccessKey = "env." + envVar + } + // Otherwise keep actual value (no asterisk redaction) + + // Restore region if it came from env var + if bedrockConfig.Region != nil { + path = fmt.Sprintf("providers.%s.meta_config.region", provider) + if envVar, ok := envVarsByPath[path]; ok { + region := "env." + envVar + bedrockConfig.Region = ®ion + } + // Otherwise keep actual value (no asterisk redaction) + } + + // Restore session token if it came from env var + if bedrockConfig.SessionToken != nil { + path = fmt.Sprintf("providers.%s.meta_config.session_token", provider) + if envVar, ok := envVarsByPath[path]; ok { + sessionToken := "env." + envVar + bedrockConfig.SessionToken = &sessionToken + } + // Otherwise keep actual value (no asterisk redaction) + } + + // Restore ARN if it came from env var + if bedrockConfig.ARN != nil { + path = fmt.Sprintf("providers.%s.meta_config.arn", provider) + if envVar, ok := envVarsByPath[path]; ok { + arn := "env." + envVar + bedrockConfig.ARN = &arn + } + // Otherwise keep actual value (no asterisk redaction) + } + + return bedrockConfig + + case *meta.VertexMetaConfig: + vertexConfig := *m // Copy the struct + + // Restore project ID if it came from env var + path := fmt.Sprintf("providers.%s.meta_config.project_id", provider) + if envVar, ok := envVarsByPath[path]; ok { + vertexConfig.ProjectID = "env." + envVar + } + // Otherwise keep actual value (no asterisk redaction) + + // Restore region if it came from env var + path = fmt.Sprintf("providers.%s.meta_config.region", provider) + if envVar, ok := envVarsByPath[path]; ok { + vertexConfig.Region = "env." + envVar + } + // Otherwise keep actual value (no asterisk redaction) + + // Restore auth credentials if it came from env var + path = fmt.Sprintf("providers.%s.meta_config.auth_credentials", provider) + if envVar, ok := envVarsByPath[path]; ok { + vertexConfig.AuthCredentials = "env." + envVar + } + // Otherwise keep actual value (no asterisk redaction) + + return vertexConfig + + default: + return metaConfig + } +} + // SaveConfig writes the current configuration back to the original config file path func (s *ConfigStore) SaveConfig() error { if s.configPath == "" { return fmt.Errorf("no config path set - use LoadFromConfig first") } - return s.WriteConfigToFile(s.configPath) + return s.writeConfigToFile(s.configPath) } // parseMetaConfig converts raw JSON to the appropriate provider-specific meta config interface @@ -1263,7 +1423,5 @@ func (s *ConfigStore) autoDetectProviders() { if detectedCount > 0 { s.logger.Info(fmt.Sprintf("Auto-configured %d provider(s) from environment variables", detectedCount)) - } else { - s.logger.Info("No common provider environment variables detected. Use the web UI or configuration file to add providers.") } } diff --git a/transports/bifrost-http/main.go b/transports/bifrost-http/main.go index b8105f4dce..495089abf0 100644 --- a/transports/bifrost-http/main.go +++ b/transports/bifrost-http/main.go @@ -296,7 +296,7 @@ func main() { // Initialize logging plugin with app-dir based path loggingConfig := &logging.Config{ DatabasePath: logDir, - LogQueueSize: 1000, + LogQueueSize: store.ClientConfig.LogQueueSize, } loggingPlugin, err := logging.NewLoggerPlugin(loggingConfig, logger) diff --git a/transports/bifrost-http/plugins/logging/main.go b/transports/bifrost-http/plugins/logging/main.go index f20ef61b8b..97b9c07a43 100644 --- a/transports/bifrost-http/plugins/logging/main.go +++ b/transports/bifrost-http/plugins/logging/main.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/dgraph-io/badger/v4" @@ -25,6 +26,7 @@ const ( // Index types ProviderIndex = "provider:" ModelIndex = "model:" + ObjectIndex = "object:" TimestampIndex = "timestamp:" StatusIndex = "status:" LatencyIndex = "latency:" @@ -119,15 +121,16 @@ type LogCallback func(*LogEntry) // LoggerPlugin implements the schemas.Plugin interface type LoggerPlugin struct { - config *Config - db *badger.DB - mu sync.RWMutex - stats *LogStats - logQueue chan *LogEntry - done chan struct{} - wg sync.WaitGroup - logger schemas.Logger - logCallback LogCallback // Callback for real-time log updates + config *Config + db *badger.DB + mu sync.RWMutex + stats *LogStats + logQueue chan *LogEntry + done chan struct{} + wg sync.WaitGroup + logger schemas.Logger + logCallback LogCallback // Callback for real-time log updates + droppedRequests atomic.Int64 } // NewLoggerPlugin creates a new logging plugin @@ -151,7 +154,7 @@ func NewLoggerPlugin(config *Config, logger schemas.Logger) (*LoggerPlugin, erro plugin := &LoggerPlugin{ config: config, db: db, - logQueue: make(chan *LogEntry, config.LogQueueSize), // Buffer for 1000 log entries + logQueue: make(chan *LogEntry, config.LogQueueSize), done: make(chan struct{}), logger: logger, stats: &LogStats{ @@ -277,8 +280,9 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes // Calculate latency latency := float64(time.Since(startTime).Milliseconds()) - // Create log entry + // Create log entry with guaranteed unique ID logEntry := &LogEntry{ + ID: uuid.New().String(), // Always generate a unique ID Timestamp: startTime, } @@ -308,11 +312,20 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes logEntry.Status = "success" if result != nil { - logEntry.ID = result.ID - logEntry.Object = result.Object + // Use result ID if available, otherwise keep the generated UUID + if result.ID != "" { + logEntry.ID = result.ID + } logEntry.Model = result.Model logEntry.Latency = &latency logEntry.TokenUsage = &result.Usage + logEntry.Object = result.Object + + if ctx != nil && result.Object == "" { + if object, ok := (*ctx).Value(RequestObjectKey).(string); ok { + logEntry.Object = object + } + } // Handle ExtraFields safely // Set provider if available @@ -324,10 +337,6 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes if result.ExtraFields.Params.Tools != nil { logEntry.Tools = result.ExtraFields.Params.Tools logEntry.Params = &result.ExtraFields.Params - - if result.ID == "" { - logEntry.ID = uuid.New().String() - } } // Extract chat history if available @@ -350,10 +359,12 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes if result.ExtraFields.ChatHistory != nil { logEntry.InputHistory = *result.ExtraFields.ChatHistory } else { - if chatHistory, ok := (*ctx).Value(RequestChatHistory).([]schemas.BifrostMessage); ok { - logEntry.InputHistory = chatHistory - } else { - logEntry.InputHistory = []schemas.BifrostMessage{} + if ctx != nil { + if chatHistory, ok := (*ctx).Value(RequestChatHistory).([]schemas.BifrostMessage); ok { + logEntry.InputHistory = chatHistory + } else { + logEntry.InputHistory = []schemas.BifrostMessage{} + } } } @@ -371,6 +382,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes default: // Queue is full, log warning but don't block the request p.logger.Warn("log queue is full, dropping log entry") + p.droppedRequests.Add(1) } return result, err, nil diff --git a/transports/bifrost-http/plugins/logging/utils.go b/transports/bifrost-http/plugins/logging/utils.go index 7642b9922c..2e7db2805d 100644 --- a/transports/bifrost-http/plugins/logging/utils.go +++ b/transports/bifrost-http/plugins/logging/utils.go @@ -60,6 +60,14 @@ func (p *LoggerPlugin) createIndexes(txn *badger.Txn, entry *LogEntry) error { } } + // Object index + if entry.Object != "" { + objectKey := fmt.Sprintf("%s%s%s:%d:%s", IndexPrefix, ObjectIndex, entry.Object, timestamp, entry.ID) + if err := txn.Set([]byte(objectKey), []byte(entry.ID)); err != nil { + return err + } + } + // Timestamp index timestampKey := fmt.Sprintf("%s%s%d:%s", IndexPrefix, TimestampIndex, timestamp, entry.ID) if err := txn.Set([]byte(timestampKey), []byte(entry.ID)); err != nil { @@ -93,13 +101,10 @@ func (p *LoggerPlugin) createIndexes(txn *badger.Txn, entry *LogEntry) error { return nil } -// SearchLogs searches for log entries based on filters and pagination options +// SearchLogs searches for log entries based on filters and pagination func (p *LoggerPlugin) SearchLogs(filters *SearchFilters, pagination *PaginationOptions) (*SearchResult, error) { - var result SearchResult - var successfulRequests int64 - var totalLatency float64 - var logsWithLatency int - var totalTokens int64 + p.mu.RLock() + defer p.mu.RUnlock() if pagination == nil { pagination = &PaginationOptions{ @@ -110,67 +115,46 @@ func (p *LoggerPlugin) SearchLogs(filters *SearchFilters, pagination *Pagination } } - // Initialize result stats - result.Stats.TotalRequests = 0 - result.Stats.SuccessRate = 0 - result.Stats.AverageLatency = 0 - result.Stats.TotalTokens = 0 - result.Pagination = *pagination + var matchingIDs []string + var allLogs []LogEntry + seenIDs := make(map[string]bool) + + // Statistics variables + var successfulRequests int64 + var totalLatency float64 + var totalTokens int64 + var logsWithLatency int64 err := p.db.View(func(txn *badger.Txn) error { - // Get matching IDs using indexes - var matchingIDs []string if filters != nil { + // Use indexes for efficient filtering matchingIDs = p.searchWithIndexes(txn, filters) } else { + // Fallback to full scan if indexing is disabled matchingIDs = p.searchFullScan(txn) } - // Early return if no matches - if len(matchingIDs) == 0 { - result.Stats.TotalRequests = 0 - return nil - } - - // Sort IDs based on pagination options - p.sortIDs(txn, matchingIDs, pagination.SortBy, pagination.Order) - - // Calculate total for stats - result.Stats.TotalRequests = int64(len(matchingIDs)) - - // Apply offset and limit for efficient pagination - start := pagination.Offset - if start >= len(matchingIDs) { - return nil - } - end := min(start+pagination.Limit, len(matchingIDs)) - pageIDs := matchingIDs[start:end] + // Fetch all matching logs, deduplicating by ID + for _, id := range matchingIDs { + if !seenIDs[id] { + if entry, err := p.getLogEntryByID(txn, id); err == nil && p.matchesFilters(entry, filters) { + allLogs = append(allLogs, *entry) + seenIDs[id] = true - // Fetch only the required log entries for the current page - for _, id := range pageIDs { - entry, err := p.getLogEntryByID(txn, id) - if err != nil { - continue - } - - // Verify the entry matches all filters - if p.matchesFilters(entry, filters) { - result.Logs = append(result.Logs, *entry) - - // Update statistics - if entry.Status == "success" { - successfulRequests++ - } - if entry.Latency != nil { - totalLatency += *entry.Latency - logsWithLatency++ - } - if entry.TokenUsage != nil { - totalTokens += int64(entry.TokenUsage.TotalTokens) + // Update statistics + if entry.Status == "success" { + successfulRequests++ + } + if entry.Latency != nil { + totalLatency += *entry.Latency + logsWithLatency++ + } + if entry.TokenUsage != nil { + totalTokens += int64(entry.TokenUsage.TotalTokens) + } } } } - return nil }) @@ -178,16 +162,43 @@ func (p *LoggerPlugin) SearchLogs(filters *SearchFilters, pagination *Pagination return nil, err } - // Calculate final statistics - if result.Stats.TotalRequests > 0 { - result.Stats.SuccessRate = float64(successfulRequests) / float64(result.Stats.TotalRequests) * 100 + // Sort logs based on pagination options + p.sortLogs(allLogs, pagination.SortBy, pagination.Order) + + // Apply pagination + total := len(allLogs) + start := pagination.Offset + end := min(pagination.Offset+pagination.Limit, total) + if start > total { + start = total } - if logsWithLatency > 0 { - result.Stats.AverageLatency = totalLatency / float64(logsWithLatency) + + // Calculate final statistics + var successRate float64 + if total > 0 { + successRate = float64(successfulRequests) / float64(total) * 100 } - result.Stats.TotalTokens = totalTokens - return &result, nil + var averageLatency float64 + if logsWithLatency > 0 { + averageLatency = totalLatency / float64(logsWithLatency) + } + + return &SearchResult{ + Logs: allLogs[start:end], + Pagination: *pagination, + Stats: struct { + TotalRequests int64 `json:"total_requests"` + SuccessRate float64 `json:"success_rate"` + AverageLatency float64 `json:"average_latency"` + TotalTokens int64 `json:"total_tokens"` + }{ + TotalRequests: int64(total), + SuccessRate: successRate, + AverageLatency: averageLatency, + TotalTokens: totalTokens, + }, + }, nil } // searchWithIndexes uses indexes to find matching log IDs efficiently @@ -232,6 +243,38 @@ func (p *LoggerPlugin) searchWithIndexes(txn *badger.Txn, filters *SearchFilters } } + if len(filters.Objects) > 0 { + objectIDs := p.searchByObjects(txn, filters.Objects) + if !hasFilters { + candidateIDs = objectIDs + hasFilters = true + } else { + candidateIDs = p.intersectIDLists(candidateIDs, objectIDs) + } + } + + // Latency range filtering (using buckets for efficiency) + if filters.MinLatency != nil || filters.MaxLatency != nil { + latencyIDs := p.searchByLatencyRange(txn, filters.MinLatency, filters.MaxLatency) + if !hasFilters { + candidateIDs = latencyIDs + hasFilters = true + } else { + candidateIDs = p.intersectIDLists(candidateIDs, latencyIDs) + } + } + + // Token range filtering (using buckets for efficiency) + if filters.MinTokens != nil || filters.MaxTokens != nil { + tokenIDs := p.searchByTokenRange(txn, filters.MinTokens, filters.MaxTokens) + if !hasFilters { + candidateIDs = tokenIDs + hasFilters = true + } else { + candidateIDs = p.intersectIDLists(candidateIDs, tokenIDs) + } + } + // If no filters were applied, return all logs if !hasFilters { return p.searchFullScan(txn) @@ -284,8 +327,8 @@ func (p *LoggerPlugin) searchByTimeRange(txn *badger.Txn, startTime, endTime *ti if err := item.Value(func(val []byte) error { ids = append(ids, string(val)) return nil - }); err == nil { - // Continue to next item + }); err != nil { + // Log error but continue processing } } } @@ -385,6 +428,136 @@ func (p *LoggerPlugin) searchByStatus(txn *badger.Txn, statuses []string) []stri return ids } +func (p *LoggerPlugin) searchByObjects(txn *badger.Txn, objects []string) []string { + idMap := make(map[string]bool) + + for _, object := range objects { + prefix := []byte(IndexPrefix + ObjectIndex + object + ":") + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + if err := item.Value(func(val []byte) error { + idMap[string(val)] = true + return nil + }); err == nil { + // Continue + } + } + it.Close() + } + + // Convert map to slice + var ids []string + for id := range idMap { + ids = append(ids, id) + } + + return ids +} + +func (p *LoggerPlugin) searchByLatencyRange(txn *badger.Txn, minLatency, maxLatency *float64) []string { + idMap := make(map[string]bool) + + // Determine which latency buckets to search + minBucket := 0 + maxBucket := int(math.Pow(10, 6)) // Very large bucket + + if minLatency != nil && *minLatency > 0 { + minBucket = getLatencyBucket(*minLatency) + } + if maxLatency != nil && *maxLatency > 0 { + maxBucket = getLatencyBucket(*maxLatency) + } + + // Search through relevant latency buckets + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + defer it.Close() + + prefix := []byte(IndexPrefix + LatencyIndex) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + key := string(item.Key()) + + // Extract bucket from key + parts := strings.Split(strings.TrimPrefix(key, IndexPrefix+LatencyIndex), ":") + if len(parts) >= 3 { + if bucket, err := strconv.Atoi(parts[0]); err == nil { + if bucket >= minBucket && bucket <= maxBucket { + if err := item.Value(func(val []byte) error { + idMap[string(val)] = true + return nil + }); err != nil { + // Log error but continue + } + } + } + } + } + + // Convert map to slice + var ids []string + for id := range idMap { + ids = append(ids, id) + } + + return ids +} + +func (p *LoggerPlugin) searchByTokenRange(txn *badger.Txn, minTokens, maxTokens *int) []string { + idMap := make(map[string]bool) + + // Determine which token buckets to search + minBucket := 0 + maxBucket := int(math.Pow(2, 20)) // Very large bucket + + if minTokens != nil && *minTokens > 0 { + minBucket = getTokenBucket(*minTokens) + } + if maxTokens != nil && *maxTokens > 0 { + maxBucket = getTokenBucket(*maxTokens) + } + + // Search through relevant token buckets + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + defer it.Close() + + prefix := []byte(IndexPrefix + TokenIndex) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + key := string(item.Key()) + + // Extract bucket from key + parts := strings.Split(strings.TrimPrefix(key, IndexPrefix+TokenIndex), ":") + if len(parts) >= 3 { + if bucket, err := strconv.Atoi(parts[0]); err == nil { + if bucket >= minBucket && bucket <= maxBucket { + if err := item.Value(func(val []byte) error { + idMap[string(val)] = true + return nil + }); err != nil { + // Log error but continue + } + } + } + } + } + + // Convert map to slice + var ids []string + for id := range idMap { + ids = append(ids, id) + } + + return ids +} + // intersectIDLists returns the intersection of two ID lists func (p *LoggerPlugin) intersectIDLists(list1, list2 []string) []string { if len(list1) == 0 || len(list2) == 0 { @@ -649,6 +822,9 @@ func getTokenBucket(tokens int) int { type LogManager interface { // Search searches for log entries based on filters and pagination Search(filters *SearchFilters, pagination *PaginationOptions) (*SearchResult, error) + + // Get the number of dropped requests + GetDroppedRequests() int64 } type PluginLogManager struct { @@ -659,6 +835,10 @@ func (p *PluginLogManager) Search(filters *SearchFilters, pagination *Pagination return p.plugin.SearchLogs(filters, pagination) } +func (p *PluginLogManager) GetDroppedRequests() int64 { + return p.plugin.droppedRequests.Load() +} + func (p *LoggerPlugin) GetPluginLogManager() *PluginLogManager { return &PluginLogManager{ plugin: p, diff --git a/transports/bifrost-http/ui/404.html b/transports/bifrost-http/ui/404.html index db7718c3aa..e06483eca3 100644 --- a/transports/bifrost-http/ui/404.html +++ b/transports/bifrost-http/ui/404.html @@ -1,4 +1,4 @@ -