From 7b8a7cdeeb023977b4a86eb744d2cdc0713d37df Mon Sep 17 00:00:00 2001 From: Pratham Mishra <99235987+Pratham-Mishra04@users.noreply.github.com> Date: Fri, 4 Jul 2025 00:09:58 +0530 Subject: [PATCH] feat: logging added to transport --- core/mcp.go | 2 +- transports/bifrost-http/handlers/logging.go | 158 ++++ transports/bifrost-http/handlers/providers.go | 117 +-- transports/bifrost-http/handlers/websocket.go | 163 +++++ transports/bifrost-http/lib/config.go | 1 + transports/bifrost-http/lib/ctx.go | 4 +- transports/bifrost-http/main.go | 27 +- .../bifrost-http/plugins/logging/main.go | 373 ++++++++++ .../bifrost-http/plugins/logging/utils.go | 672 ++++++++++++++++++ .../telemetry}/docker-compose.yml | 0 .../plugin.go => plugins/telemetry/main.go} | 10 +- .../telemetry}/prometheus.yml | 0 .../{tracking => plugins/telemetry}/setup.go | 4 +- transports/go.mod | 7 +- transports/go.sum | 12 + 15 files changed, 1481 insertions(+), 69 deletions(-) create mode 100644 transports/bifrost-http/handlers/logging.go create mode 100644 transports/bifrost-http/handlers/websocket.go create mode 100644 transports/bifrost-http/plugins/logging/main.go create mode 100644 transports/bifrost-http/plugins/logging/utils.go rename transports/bifrost-http/{tracking => plugins/telemetry}/docker-compose.yml (100%) rename transports/bifrost-http/{tracking/plugin.go => plugins/telemetry/main.go} (95%) rename transports/bifrost-http/{tracking => plugins/telemetry}/prometheus.yml (100%) rename transports/bifrost-http/{tracking => plugins/telemetry}/setup.go (98%) diff --git a/core/mcp.go b/core/mcp.go index 0e74811c96..43b98f17e7 100644 --- a/core/mcp.go +++ b/core/mcp.go @@ -849,7 +849,7 @@ func (m *MCPManager) extractTextFromMCPResponse(toolResponse *mcp.CallToolResult // Handle typed content switch content := contentBlock.(type) { case mcp.TextContent: - result.WriteString(fmt.Sprintf("[Text Response: %s]\n", content.Text)) + result.WriteString(content.Text) case mcp.ImageContent: result.WriteString(fmt.Sprintf("[Image Response: %s, MIME: %s]\n", content.Data, content.MIMEType)) case mcp.AudioContent: diff --git a/transports/bifrost-http/handlers/logging.go b/transports/bifrost-http/handlers/logging.go new file mode 100644 index 0000000000..b576a9215f --- /dev/null +++ b/transports/bifrost-http/handlers/logging.go @@ -0,0 +1,158 @@ +// Package handlers provides HTTP request handlers for the Bifrost HTTP transport. +// This file contains logging-related handlers for log search, stats, and management. +package handlers + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/fasthttp/router" + "github.com/maximhq/bifrost/core/schemas" + "github.com/maximhq/bifrost/transports/bifrost-http/plugins/logging" + "github.com/valyala/fasthttp" +) + +// LoggingHandler manages HTTP requests for logging operations +type LoggingHandler struct { + logManager logging.LogManager + logger schemas.Logger +} + +// NewLoggingHandler creates a new logging handler instance +func NewLoggingHandler(logManager logging.LogManager, logger schemas.Logger) *LoggingHandler { + return &LoggingHandler{ + logManager: logManager, + logger: logger, + } +} + +// RegisterRoutes registers all logging-related routes +func (h *LoggingHandler) RegisterRoutes(r *router.Router) { + // Log retrieval with filtering, search, and pagination + r.GET("/v1/logs", h.GetLogs) +} + +// GetLogs handles GET /v1/logs - Get logs with filtering, search, and pagination via query parameters +func (h *LoggingHandler) GetLogs(ctx *fasthttp.RequestCtx) { + // Parse query parameters into filters + filters := &logging.SearchFilters{} + pagination := &logging.PaginationOptions{} + + // Extract filters from query parameters + if providers := string(ctx.QueryArgs().Peek("providers")); providers != "" { + filters.Providers = parseCommaSeparated(providers) + } + if models := string(ctx.QueryArgs().Peek("models")); models != "" { + filters.Models = parseCommaSeparated(models) + } + if statuses := string(ctx.QueryArgs().Peek("status")); statuses != "" { + filters.Status = parseCommaSeparated(statuses) + } + if objects := string(ctx.QueryArgs().Peek("objects")); objects != "" { + filters.Objects = parseCommaSeparated(objects) + } + if startTime := string(ctx.QueryArgs().Peek("start_time")); startTime != "" { + if t, err := time.Parse(time.RFC3339, startTime); err == nil { + filters.StartTime = &t + } + } + if endTime := string(ctx.QueryArgs().Peek("end_time")); endTime != "" { + if t, err := time.Parse(time.RFC3339, endTime); err == nil { + filters.EndTime = &t + } + } + if minLatency := string(ctx.QueryArgs().Peek("min_latency")); minLatency != "" { + if f, err := strconv.ParseFloat(minLatency, 64); err == nil { + filters.MinLatency = &f + } + } + if maxLatency := string(ctx.QueryArgs().Peek("max_latency")); maxLatency != "" { + if val, err := strconv.ParseFloat(maxLatency, 64); err == nil { + filters.MaxLatency = &val + } + } + if minTokens := string(ctx.QueryArgs().Peek("min_tokens")); minTokens != "" { + if val, err := strconv.Atoi(minTokens); err == nil { + filters.MinTokens = &val + } + } + if maxTokens := string(ctx.QueryArgs().Peek("max_tokens")); maxTokens != "" { + if val, err := strconv.Atoi(maxTokens); err == nil { + filters.MaxTokens = &val + } + } + if contentSearch := string(ctx.QueryArgs().Peek("content_search")); contentSearch != "" { + filters.ContentSearch = contentSearch + } + + // Extract pagination parameters + pagination.Limit = 50 // Default limit + if limit := string(ctx.QueryArgs().Peek("limit")); limit != "" { + if i, err := strconv.Atoi(limit); err == nil { + if i <= 0 { + SendError(ctx, fasthttp.StatusBadRequest, "limit must be greater than 0", h.logger) + return + } + if i > 1000 { + SendError(ctx, fasthttp.StatusBadRequest, "limit cannot exceed 1000", h.logger) + return + } + pagination.Limit = i + } + } + + pagination.Offset = 0 // Default offset + if offset := string(ctx.QueryArgs().Peek("offset")); offset != "" { + if i, err := strconv.Atoi(offset); err == nil { + if i < 0 { + SendError(ctx, fasthttp.StatusBadRequest, "offset cannot be negative", h.logger) + return + } + pagination.Offset = i + } + } + + // Sort parameters + pagination.SortBy = "timestamp" // Default sort field + if sortBy := string(ctx.QueryArgs().Peek("sort_by")); sortBy != "" { + if sortBy == "timestamp" || sortBy == "latency" || sortBy == "tokens" { + pagination.SortBy = sortBy + } + } + + pagination.Order = "desc" // Default sort order + if order := string(ctx.QueryArgs().Peek("order")); order != "" { + if order == "asc" || order == "desc" { + pagination.Order = order + } + } + + result, err := h.logManager.Search(filters, pagination) + if err != nil { + h.logger.Error(fmt.Errorf("failed to search logs: %w", err)) + SendError(ctx, fasthttp.StatusInternalServerError, fmt.Sprintf("Search failed: %v", err), h.logger) + return + } + + SendJSON(ctx, result, h.logger) +} + +// Helper functions + +// parseCommaSeparated splits a comma-separated string into a slice +func parseCommaSeparated(s string) []string { + if s == "" { + return nil + } + + var result []string + for _, item := range strings.Split(s, ",") { + if trimmed := strings.TrimSpace(item); trimmed != "" { + result = append(result, trimmed) + } + } + + return result +} diff --git a/transports/bifrost-http/handlers/providers.go b/transports/bifrost-http/handlers/providers.go index 9d34e0cc43..bdf685eb21 100644 --- a/transports/bifrost-http/handlers/providers.go +++ b/transports/bifrost-http/handlers/providers.go @@ -37,22 +37,26 @@ type AddProviderRequest struct { NetworkConfig *schemas.NetworkConfig `json:"network_config,omitempty"` // Network-related settings MetaConfig *map[string]interface{} `json:"meta_config,omitempty"` // Provider-specific metadata ConcurrencyAndBufferSize *schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size,omitempty"` // Concurrency settings + ProxyConfig *schemas.ProxyConfig `json:"proxy_config,omitempty"` // Proxy configuration } // UpdateProviderRequest represents the request body for updating a provider type UpdateProviderRequest struct { - Keys *[]schemas.Key `json:"keys,omitempty"` // API keys for the provider - NetworkConfig *schemas.NetworkConfig `json:"network_config,omitempty"` // Network-related settings - MetaConfig *map[string]interface{} `json:"meta_config,omitempty"` // Provider-specific metadata - ConcurrencyAndBufferSize *schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size,omitempty"` // Concurrency settings + Keys []schemas.Key `json:"keys"` // API keys for the provider + NetworkConfig schemas.NetworkConfig `json:"network_config"` // Network-related settings + MetaConfig *map[string]interface{} `json:"meta_config,omitempty"` // Provider-specific metadata + ConcurrencyAndBufferSize schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size"` // Concurrency settings + ProxyConfig *schemas.ProxyConfig `json:"proxy_config,omitempty"` // Proxy configuration } // ProviderResponse represents the response for provider operations type ProviderResponse struct { - Provider schemas.ModelProvider `json:"provider"` - Config *lib.ProviderConfig `json:"config,omitempty"` - Status string `json:"status"` - Message string `json:"message,omitempty"` + Name schemas.ModelProvider `json:"name"` + Keys []schemas.Key `json:"keys"` // API keys for the provider + NetworkConfig schemas.NetworkConfig `json:"network_config"` // Network-related settings + MetaConfig *schemas.MetaConfig `json:"meta_config"` // Provider-specific metadata + ConcurrencyAndBufferSize schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size"` // Concurrency settings + ProxyConfig *schemas.ProxyConfig `json:"proxy_config"` // Proxy configuration } // ListProvidersResponse represents the response for listing all providers @@ -95,18 +99,12 @@ func (h *ProviderHandler) ListProviders(ctx *fasthttp.RequestCtx) { h.logger.Warn(fmt.Sprintf("Failed to get config for provider %s: %v", provider, err)) // Include provider even if config fetch fails providerResponses = append(providerResponses, ProviderResponse{ - Provider: provider, - Status: "error", - Message: fmt.Sprintf("Failed to get config: %v", err), + Name: provider, }) continue } - providerResponses = append(providerResponses, ProviderResponse{ - Provider: provider, - Config: config, - Status: "active", - }) + providerResponses = append(providerResponses, h.getProviderResponseFromConfig(provider, *config)) } response := ListProvidersResponse{ @@ -131,11 +129,7 @@ func (h *ProviderHandler) GetProvider(ctx *fasthttp.RequestCtx) { return } - response := ProviderResponse{ - Provider: provider, - Config: config, - Status: "active", - } + response := h.getProviderResponseFromConfig(provider, *config) SendJSON(ctx, response, h.logger) } @@ -155,7 +149,7 @@ func (h *ProviderHandler) AddProvider(ctx *fasthttp.RequestCtx) { } // Validate required keys - if len(req.Keys) == 0 { + if len(req.Keys) == 0 && req.Provider != schemas.Vertex && req.Provider != schemas.Ollama { SendError(ctx, fasthttp.StatusBadRequest, "At least one API key is required", h.logger) return } @@ -204,12 +198,7 @@ func (h *ProviderHandler) AddProvider(ctx *fasthttp.RequestCtx) { h.logger.Info(fmt.Sprintf("Provider %s added successfully", req.Provider)) - response := ProviderResponse{ - Provider: req.Provider, - Config: &config, - Status: "added", - Message: fmt.Sprintf("Provider %s added successfully", req.Provider), - } + response := h.getProviderResponseFromConfig(req.Provider, config) SendJSON(ctx, response, h.logger) } @@ -247,11 +236,11 @@ func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) { // Validate required keys (at least one key must be provided) if req.Keys != nil { - if len(*req.Keys) == 0 { + if len(req.Keys) == 0 && provider != schemas.Vertex && provider != schemas.Ollama { SendError(ctx, fasthttp.StatusBadRequest, "At least one API key is required", h.logger) return } - config.Keys = *req.Keys + config.Keys = req.Keys } // Handle meta config if provided @@ -265,17 +254,23 @@ func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) { config.MetaConfig = metaConfig } - if req.ConcurrencyAndBufferSize != nil { - if req.ConcurrencyAndBufferSize.Concurrency == 0 { - SendError(ctx, fasthttp.StatusBadRequest, "Concurrency must be greater than 0", h.logger) - return - } - if req.ConcurrencyAndBufferSize.BufferSize == 0 { - SendError(ctx, fasthttp.StatusBadRequest, "Buffer size must be greater than 0", h.logger) - return - } - config.ConcurrencyAndBufferSize = req.ConcurrencyAndBufferSize + if req.ConcurrencyAndBufferSize.Concurrency == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "Concurrency must be greater than 0", h.logger) + return } + if req.ConcurrencyAndBufferSize.BufferSize == 0 { + SendError(ctx, fasthttp.StatusBadRequest, "Buffer size must be greater than 0", h.logger) + return + } + + if req.ConcurrencyAndBufferSize.Concurrency > req.ConcurrencyAndBufferSize.BufferSize { + SendError(ctx, fasthttp.StatusBadRequest, "Concurrency must be less than or equal to buffer size", h.logger) + return + } + + config.ConcurrencyAndBufferSize = &req.ConcurrencyAndBufferSize + config.NetworkConfig = &req.NetworkConfig + config.ProxyConfig = req.ProxyConfig // Update provider config in store (env vars will be processed by store) if err := h.store.UpdateProviderConfig(provider, config); err != nil { @@ -284,20 +279,16 @@ func (h *ProviderHandler) UpdateProvider(ctx *fasthttp.RequestCtx) { return } - // Update concurrency and queue configuration in Bifrost - if err := h.client.UpdateProviderConcurrency(provider); err != nil { - // Note: Store update succeeded, continue but log the concurrency update failure - h.logger.Warn(fmt.Sprintf("Failed to update concurrency for provider %s: %v", provider, err)) + if config.ConcurrencyAndBufferSize.Concurrency != oldConfig.ConcurrencyAndBufferSize.Concurrency || + config.ConcurrencyAndBufferSize.BufferSize != oldConfig.ConcurrencyAndBufferSize.BufferSize { + // Update concurrency and queue configuration in Bifrost + if err := h.client.UpdateProviderConcurrency(provider); err != nil { + // Note: Store update succeeded, continue but log the concurrency update failure + h.logger.Warn(fmt.Sprintf("Failed to update concurrency for provider %s: %v", provider, err)) + } } - h.logger.Info(fmt.Sprintf("Provider %s updated successfully", provider)) - - response := ProviderResponse{ - Provider: provider, - Config: &config, - Status: "updated", - Message: fmt.Sprintf("Provider %s updated successfully", provider), - } + response := h.getProviderResponseFromConfig(provider, config) SendJSON(ctx, response, h.logger) } @@ -326,9 +317,7 @@ func (h *ProviderHandler) DeleteProvider(ctx *fasthttp.RequestCtx) { h.logger.Info(fmt.Sprintf("Provider %s removed successfully", provider)) response := ProviderResponse{ - Provider: provider, - Status: "removed", - Message: fmt.Sprintf("Provider %s removed successfully", provider), + Name: provider, } SendJSON(ctx, response, h.logger) @@ -396,6 +385,24 @@ func (h *ProviderHandler) convertToProviderMetaConfig(provider schemas.ModelProv } } +func (h *ProviderHandler) getProviderResponseFromConfig(provider schemas.ModelProvider, config lib.ProviderConfig) ProviderResponse { + if config.NetworkConfig == nil { + config.NetworkConfig = &schemas.DefaultNetworkConfig + } + if config.ConcurrencyAndBufferSize == nil { + config.ConcurrencyAndBufferSize = &schemas.DefaultConcurrencyAndBufferSize + } + + return ProviderResponse{ + Name: provider, + Keys: config.Keys, + NetworkConfig: *config.NetworkConfig, + MetaConfig: config.MetaConfig, + ConcurrencyAndBufferSize: *config.ConcurrencyAndBufferSize, + ProxyConfig: config.ProxyConfig, + } +} + func getProviderFromCtx(ctx *fasthttp.RequestCtx) (schemas.ModelProvider, error) { providerValue := ctx.UserValue("provider") if providerValue == nil { diff --git a/transports/bifrost-http/handlers/websocket.go b/transports/bifrost-http/handlers/websocket.go new file mode 100644 index 0000000000..4900f2fada --- /dev/null +++ b/transports/bifrost-http/handlers/websocket.go @@ -0,0 +1,163 @@ +// Package handlers provides HTTP request handlers for the Bifrost HTTP transport. +// This file contains WebSocket handlers for real-time log streaming. +package handlers + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/fasthttp/router" + "github.com/fasthttp/websocket" + "github.com/maximhq/bifrost/core/schemas" + "github.com/maximhq/bifrost/transports/bifrost-http/plugins/logging" + "github.com/valyala/fasthttp" +) + +// WebSocketHandler manages WebSocket connections for real-time updates +type WebSocketHandler struct { + logManager logging.LogManager + logger schemas.Logger + clients map[*websocket.Conn]bool + mu sync.RWMutex + stopChan chan struct{} // Channel to signal heartbeat goroutine to stop + done chan struct{} // Channel to signal when heartbeat goroutine has stopped +} + +// NewWebSocketHandler creates a new WebSocket handler instance +func NewWebSocketHandler(logManager logging.LogManager, logger schemas.Logger) *WebSocketHandler { + return &WebSocketHandler{ + logManager: logManager, + logger: logger, + clients: make(map[*websocket.Conn]bool), + stopChan: make(chan struct{}), + done: make(chan struct{}), + } +} + +// RegisterRoutes registers all WebSocket-related routes +func (h *WebSocketHandler) RegisterRoutes(r *router.Router) { + r.GET("/ws/logs", h.HandleLogStream) +} + +// WebSocket upgrader configuration +var upgrader = websocket.FastHTTPUpgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(ctx *fasthttp.RequestCtx) bool { + return true // Allow all origins since we are running on localhost, restrictions can be added on network level + }, +} + +// HandleLogStream handles WebSocket connections for real-time log streaming +func (h *WebSocketHandler) HandleLogStream(ctx *fasthttp.RequestCtx) { + err := upgrader.Upgrade(ctx, func(ws *websocket.Conn) { + // Register new client + h.mu.Lock() + h.clients[ws] = true + h.mu.Unlock() + + // Clean up on disconnect + defer func() { + h.mu.Lock() + delete(h.clients, ws) + h.mu.Unlock() + ws.Close() + }() + + // Keep connection alive and handle client messages + // This loop continuously reads and discards incoming WebSocket messages to: + // 1. Keep the connection alive by processing client pings and control frames + // 2. Detect when the client disconnects by watching for close frames or errors + // 3. Maintain proper WebSocket protocol handling without accumulating messages + for { + _, _, err := ws.ReadMessage() + if err != nil { + // Only log unexpected close errors + if websocket.IsUnexpectedCloseError(err, + websocket.CloseNormalClosure, + websocket.CloseGoingAway, + websocket.CloseAbnormalClosure, + websocket.CloseNoStatusReceived) { + h.logger.Error(fmt.Errorf("websocket read error: %v", err)) + } + break + } + } + }) + + if err != nil { + h.logger.Error(fmt.Errorf("websocket upgrade error: %v", err)) + return + } +} + +// BroadcastLogUpdate sends a log update to all connected WebSocket clients +func (h *WebSocketHandler) BroadcastLogUpdate(logEntry *logging.LogEntry) { + message := struct { + Type string `json:"type"` + Payload *logging.LogEntry `json:"payload"` + }{ + Type: "log", + Payload: logEntry, + } + + data, err := json.Marshal(message) + if err != nil { + h.logger.Error(fmt.Errorf("failed to marshal log entry: %v", err)) + return + } + + h.mu.RLock() + defer h.mu.RUnlock() + + for client := range h.clients { + err := client.WriteMessage(websocket.TextMessage, data) + if err != nil { + h.logger.Error(fmt.Errorf("failed to send message to client: %v", err)) + continue + } + } +} + +// StartHeartbeat starts sending periodic heartbeat messages to keep connections alive +func (h *WebSocketHandler) StartHeartbeat() { + ticker := time.NewTicker(30 * time.Second) + go func() { + defer func() { + ticker.Stop() + close(h.done) + }() + + for { + select { + case <-ticker.C: + h.mu.RLock() + for client := range h.clients { + err := client.WriteMessage(websocket.PingMessage, nil) + if err != nil { + h.logger.Error(fmt.Errorf("failed to send heartbeat: %v", err)) + } + } + h.mu.RUnlock() + case <-h.stopChan: + return + } + } + }() +} + +// Stop gracefully shuts down the WebSocket handler +func (h *WebSocketHandler) Stop() { + close(h.stopChan) // Signal heartbeat goroutine to stop + <-h.done // Wait for heartbeat goroutine to finish + + // Close all client connections + h.mu.Lock() + for client := range h.clients { + client.Close() + } + h.clients = make(map[*websocket.Conn]bool) + h.mu.Unlock() +} diff --git a/transports/bifrost-http/lib/config.go b/transports/bifrost-http/lib/config.go index 7d6c0c5324..733a500a31 100644 --- a/transports/bifrost-http/lib/config.go +++ b/transports/bifrost-http/lib/config.go @@ -20,6 +20,7 @@ type ProviderConfig struct { NetworkConfig *schemas.NetworkConfig `json:"network_config,omitempty"` // Network-related settings MetaConfig *schemas.MetaConfig `json:"-"` // Provider-specific metadata ConcurrencyAndBufferSize *schemas.ConcurrencyAndBufferSize `json:"concurrency_and_buffer_size,omitempty"` // Concurrency settings + ProxyConfig *schemas.ProxyConfig `json:"proxy_config,omitempty"` // Proxy configuration } // ConfigMap maps provider names to their configurations. diff --git a/transports/bifrost-http/lib/ctx.go b/transports/bifrost-http/lib/ctx.go index 690b7d24f1..bfd29e415b 100644 --- a/transports/bifrost-http/lib/ctx.go +++ b/transports/bifrost-http/lib/ctx.go @@ -11,7 +11,7 @@ import ( "strings" "github.com/maximhq/bifrost/plugins/maxim" - "github.com/maximhq/bifrost/transports/bifrost-http/tracking" + "github.com/maximhq/bifrost/transports/bifrost-http/plugins/telemetry" "github.com/valyala/fasthttp" ) @@ -58,7 +58,7 @@ func ConvertToBifrostContext(ctx *fasthttp.RequestCtx) *context.Context { if strings.HasPrefix(keyStr, "x-bf-prom-") { labelName := strings.TrimPrefix(keyStr, "x-bf-prom-") - bifrostCtx = context.WithValue(bifrostCtx, tracking.PrometheusContextKey(labelName), string(value)) + bifrostCtx = context.WithValue(bifrostCtx, telemetry.PrometheusContextKey(labelName), string(value)) } if strings.HasPrefix(keyStr, "x-bf-maxim-") { diff --git a/transports/bifrost-http/main.go b/transports/bifrost-http/main.go index 73ff171c08..a2b37228f2 100644 --- a/transports/bifrost-http/main.go +++ b/transports/bifrost-http/main.go @@ -63,7 +63,8 @@ import ( "github.com/maximhq/bifrost/plugins/maxim" "github.com/maximhq/bifrost/transports/bifrost-http/handlers" "github.com/maximhq/bifrost/transports/bifrost-http/lib" - "github.com/maximhq/bifrost/transports/bifrost-http/tracking" + "github.com/maximhq/bifrost/transports/bifrost-http/plugins/logging" + "github.com/maximhq/bifrost/transports/bifrost-http/plugins/telemetry" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -149,7 +150,7 @@ func main() { log.Fatalf("failed to parse config JSON: %v", err) } - tracking.InitPrometheusMetrics(config.Client.PrometheusLabels) + telemetry.InitPrometheusMetrics(config.Client.PrometheusLabels) log.Println("Prometheus Go/Process collectors registered.") // Initialize high-performance configuration store with caching @@ -196,8 +197,13 @@ func main() { } } - promPlugin := tracking.NewPrometheusPlugin() - loadedPlugins = append(loadedPlugins, promPlugin) + promPlugin := telemetry.NewPrometheusPlugin() + loggingPlugin, err := logging.NewLoggerPlugin(nil) + if err != nil { + log.Fatalf("failed to initialize logging plugin: %v", err) + } + + loadedPlugins = append(loadedPlugins, promPlugin, loggingPlugin) client, err := bifrost.Init(schemas.BifrostConfig{ Account: account, @@ -219,6 +225,14 @@ func main() { mcpHandler := handlers.NewMCPHandler(client, logger, store) integrationHandler := handlers.NewIntegrationHandler(client) configHandler := handlers.NewConfigHandler(client, logger, configPath) + loggingHandler := handlers.NewLoggingHandler(loggingPlugin.GetPluginLogManager(), logger) + wsHandler := handlers.NewWebSocketHandler(loggingPlugin.GetPluginLogManager(), logger) + + // Set up WebSocket callback for real-time log updates + loggingPlugin.SetLogCallback(wsHandler.BroadcastLogUpdate) + + // Start WebSocket heartbeat + wsHandler.StartHeartbeat() r := router.New() @@ -228,6 +242,8 @@ func main() { mcpHandler.RegisterRoutes(r) integrationHandler.RegisterRoutes(r) configHandler.RegisterRoutes(r) + loggingHandler.RegisterRoutes(r) + wsHandler.RegisterRoutes(r) // Add Prometheus /metrics endpoint r.GET("/metrics", fasthttpadaptor.NewFastHTTPHandler(promhttp.Handler())) @@ -243,7 +259,7 @@ func main() { r.Handler(ctx) return } - tracking.PrometheusMiddleware(r.Handler)(ctx) + telemetry.PrometheusMiddleware(r.Handler)(ctx) }, } @@ -252,5 +268,6 @@ func main() { log.Fatalf("failed to start server: %v", err) } + wsHandler.Stop() client.Cleanup() } diff --git a/transports/bifrost-http/plugins/logging/main.go b/transports/bifrost-http/plugins/logging/main.go new file mode 100644 index 0000000000..d70000b89a --- /dev/null +++ b/transports/bifrost-http/plugins/logging/main.go @@ -0,0 +1,373 @@ +// Package logging provides a BadgerDB-based logging plugin for Bifrost. +// This plugin stores comprehensive logs of all requests and responses with search, +// filter, and pagination capabilities. +package logging + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/dgraph-io/badger/v4" + "github.com/google/uuid" + "github.com/maximhq/bifrost/core/schemas" +) + +const ( + PluginName = "bifrost-http-logging" + + // Key prefixes for different data types + LogPrefix = "log:" + IndexPrefix = "idx:" + StatsPrefix = "stats:" + + // Index types + ProviderIndex = "provider:" + ModelIndex = "model:" + TimestampIndex = "timestamp:" + StatusIndex = "status:" + LatencyIndex = "latency:" + TokenIndex = "token:" +) + +// ContextKey is a custom type for context keys to prevent collisions +type ContextKey string + +const ( + RequestProviderKey ContextKey = "bifrost-http-logging-provider" + RequestModelKey ContextKey = "bifrost-http-logging-model" + RequestObjectKey ContextKey = "bifrost-http-logging-object" + RequestStartTimeKey ContextKey = "bifrost-http-logging-start-time" + RequestChatHistory ContextKey = "bifrost-http-logging-chat-history" +) + +// LogEntry represents a complete log entry for a request/response cycle +type LogEntry struct { + ID string `json:"id"` + Timestamp time.Time `json:"timestamp"` + Provider string `json:"provider"` + Model string `json:"model"` + Object string `json:"object"` // text.completion, chat.completion, or embedding + InputHistory []schemas.BifrostMessage `json:"input_history,omitempty"` + InputText *string `json:"input_text,omitempty"` + OutputMessage *schemas.BifrostMessage `json:"output_message,omitempty"` + Params *schemas.ModelParameters `json:"params,omitempty"` + Tools *[]schemas.Tool `json:"tools,omitempty"` + ToolCalls *[]schemas.ToolCall `json:"tool_calls,omitempty"` + Latency *float64 `json:"latency,omitempty"` + TokenUsage *schemas.LLMUsage `json:"token_usage,omitempty"` + Status string `json:"status"` // "success" or "error" + ErrorDetails *schemas.BifrostError `json:"error_details,omitempty"` + ExtraFields map[string]interface{} `json:"extra_fields,omitempty"` +} + +// SearchFilters represents the available filters for log searches +type SearchFilters struct { + Providers []string `json:"providers,omitempty"` + Models []string `json:"models,omitempty"` + Status []string `json:"status,omitempty"` + Objects []string `json:"objects,omitempty"` // For filtering by request type (chat.completion, text.completion, embedding) + StartTime *time.Time `json:"start_time,omitempty"` + EndTime *time.Time `json:"end_time,omitempty"` + MinLatency *float64 `json:"min_latency,omitempty"` + MaxLatency *float64 `json:"max_latency,omitempty"` + MinTokens *int `json:"min_tokens,omitempty"` + MaxTokens *int `json:"max_tokens,omitempty"` + ContentSearch string `json:"content_search,omitempty"` +} + +// PaginationOptions represents pagination parameters +type PaginationOptions struct { + Limit int `json:"limit"` + Offset int `json:"offset"` + SortBy string `json:"sort_by"` // "timestamp", "latency", "tokens" + Order string `json:"order"` // "asc", "desc" +} + +// SearchResult represents the result of a log search +type SearchResult struct { + Logs []LogEntry `json:"logs"` + Pagination PaginationOptions `json:"pagination"` + Stats struct { + TotalRequests int64 `json:"total_requests"` + SuccessRate float64 `json:"success_rate"` // Percentage of successful requests + AverageLatency float64 `json:"average_latency"` // Average latency in milliseconds + TotalTokens int64 `json:"total_tokens"` // Total tokens used + } `json:"stats"` +} + +// LogStats represents aggregated statistics +type LogStats struct { + TotalRequests int64 `json:"total_requests"` + SuccessfulRequests int64 `json:"successful_requests"` + FailedRequests int64 `json:"failed_requests"` + ProviderStats map[string]int64 `json:"provider_stats"` + ModelStats map[string]int64 `json:"model_stats"` + AverageLatency float64 `json:"average_latency"` + TotalTokens int64 `json:"total_tokens"` + LastUpdated time.Time `json:"last_updated"` +} + +// Config represents the configuration for the logging plugin +type Config struct { + DatabasePath string `json:"database_path"` +} + +// LogCallback is a function that gets called when a new log entry is created +type LogCallback func(*LogEntry) + +// LoggerPlugin implements the schemas.Plugin interface +type LoggerPlugin struct { + config *Config + db *badger.DB + mu sync.RWMutex + stats *LogStats + logQueue chan *LogEntry + done chan struct{} + wg sync.WaitGroup + logCallback LogCallback // Callback for real-time log updates +} + +// NewLoggerPlugin creates a new logging plugin +func NewLoggerPlugin(config *Config) (*LoggerPlugin, error) { + if config == nil { + config = &Config{ + DatabasePath: "./badger_logs", + } + } + + // Open BadgerDB + opts := badger.DefaultOptions(config.DatabasePath) + opts.Logger = nil // Disable BadgerDB's own logging to avoid noise + + db, err := badger.Open(opts) + if err != nil { + return nil, fmt.Errorf("failed to open BadgerDB: %w", err) + } + + plugin := &LoggerPlugin{ + config: config, + db: db, + logQueue: make(chan *LogEntry, 1000), // Buffer for 1000 log entries + done: make(chan struct{}), + stats: &LogStats{ + ProviderStats: make(map[string]int64), + ModelStats: make(map[string]int64), + }, + } + + // Start background worker + plugin.wg.Add(1) + go plugin.backgroundWorker() + + return plugin, nil +} + +// SetLogCallback sets the callback function for real-time log updates +func (p *LoggerPlugin) SetLogCallback(callback LogCallback) { + p.mu.Lock() + defer p.mu.Unlock() + p.logCallback = callback +} + +// processLogEntry handles storing a log entry and calling any registered callbacks +func (p *LoggerPlugin) processLogEntry(entry *LogEntry, isShutdown bool) { + // Store the log entry + if err := p.storeLogEntry(entry); err != nil { + if isShutdown { + fmt.Printf("BadgerDB Logger: failed to store log entry during shutdown: %v\n", err) + } else { + fmt.Printf("BadgerDB Logger: failed to store log entry: %v\n", err) + } + } + + // Call the callback if set + p.mu.RLock() + if p.logCallback != nil { + p.logCallback(entry) + } + p.mu.RUnlock() +} + +// backgroundWorker processes log entries asynchronously +func (p *LoggerPlugin) backgroundWorker() { + defer p.wg.Done() + + for { + select { + case entry := <-p.logQueue: + p.processLogEntry(entry, false) + + case <-p.done: + // Drain the remaining queue before exiting + for { + select { + case entry := <-p.logQueue: + p.processLogEntry(entry, true) + default: + return + } + } + } + } +} + +// GetName returns the name of the plugin +func (p *LoggerPlugin) GetName() string { + return PluginName +} + +// PreHook is called before a request is processed +func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest) (*schemas.BifrostRequest, *schemas.PluginShortCircuit, error) { + // Generate unique request ID and record start time + startTime := time.Now() + + // Store request ID and start time in context + if ctx != nil { + *ctx = context.WithValue(*ctx, RequestProviderKey, req.Provider) + *ctx = context.WithValue(*ctx, RequestModelKey, req.Model) + *ctx = context.WithValue(*ctx, RequestObjectKey, func() string { + if req.Input.ChatCompletionInput != nil { + return "chat.completion" + } else if req.Input.TextCompletionInput != nil { + return "text.completion" + } else if req.Input.EmbeddingInput != nil { + return "embedding" + } + return "unknown" + }()) + *ctx = context.WithValue(*ctx, RequestStartTimeKey, startTime) + + if req.Input.ChatCompletionInput != nil { + *ctx = context.WithValue(*ctx, RequestChatHistory, *req.Input.ChatCompletionInput) + } else if req.Input.TextCompletionInput != nil { + *ctx = context.WithValue(*ctx, RequestChatHistory, []schemas.BifrostMessage{ + { + Role: schemas.ModelChatMessageRoleUser, + Content: schemas.MessageContent{ + ContentStr: req.Input.TextCompletionInput, + }, + }, + }) + } + } + + return req, nil, nil +} + +// PostHook is called after a response is received +func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostResponse, err *schemas.BifrostError) (*schemas.BifrostResponse, *schemas.BifrostError, error) { + // Extract request metadata from context + var requestID string + var startTime time.Time + + if ctx != nil { + if st, ok := (*ctx).Value(RequestStartTimeKey).(time.Time); ok { + startTime = st + } + } + + if requestID == "" { + requestID = uuid.New().String() + } + if startTime.IsZero() { + startTime = time.Now() + } + + // Calculate latency + latency := float64(time.Since(startTime).Milliseconds()) + + // Create log entry + logEntry := &LogEntry{ + ID: requestID, + Timestamp: startTime, + } + + // Determine status and populate entry + if err != nil { + logEntry.Status = "error" + logEntry.ErrorDetails = err + + if ctx != nil { + if provider, ok := (*ctx).Value(RequestProviderKey).(schemas.ModelProvider); ok { + logEntry.Provider = string(provider) + } + if model, ok := (*ctx).Value(RequestModelKey).(string); ok { + logEntry.Model = model + } + if chatHistory, ok := (*ctx).Value(RequestChatHistory).([]schemas.BifrostMessage); ok { + logEntry.InputHistory = chatHistory + } else { + logEntry.InputHistory = []schemas.BifrostMessage{} + } + if object, ok := (*ctx).Value(RequestObjectKey).(string); ok { + logEntry.Object = object + } + } + + } else { + logEntry.Status = "success" + + if result != nil { + // Set model and latency which don't depend on ExtraFields + logEntry.Model = result.Model + logEntry.Latency = &latency + logEntry.TokenUsage = &result.Usage + + // Handle ExtraFields safely + // Set provider if available + if result.ExtraFields.Provider != "" { + logEntry.Provider = string(result.ExtraFields.Provider) + } + + // Set params if available + if result.ExtraFields.Params.Tools != nil { + logEntry.Tools = result.ExtraFields.Params.Tools + logEntry.Params = &result.ExtraFields.Params + } + + // Extract chat history if available + if result.ExtraFields.ChatHistory != nil { + logEntry.InputHistory = *result.ExtraFields.ChatHistory + } + + // Extract output message and tool calls + if len(result.Choices) > 0 { + logEntry.OutputMessage = &result.Choices[0].Message + + // Extract tool calls if present + if result.Choices[0].Message.AssistantMessage != nil && + result.Choices[0].Message.AssistantMessage.ToolCalls != nil { + logEntry.ToolCalls = result.Choices[0].Message.AssistantMessage.ToolCalls + } + } + } + } + + // Queue the log entry for async processing (non-blocking) + select { + case p.logQueue <- logEntry: + // Successfully queued + default: + // Queue is full, log warning but don't block the request + fmt.Printf("Logger: log queue is full, dropping log entry\n") + } + + return result, err, nil +} + +// Cleanup is called when the plugin is being shut down +func (p *LoggerPlugin) Cleanup() error { + // Signal the background worker to stop + close(p.done) + + // Wait for the background worker to finish processing remaining items + p.wg.Wait() + + // Close the database + if p.db != nil { + return p.db.Close() + } + return nil +} diff --git a/transports/bifrost-http/plugins/logging/utils.go b/transports/bifrost-http/plugins/logging/utils.go new file mode 100644 index 0000000000..bc3f7cec05 --- /dev/null +++ b/transports/bifrost-http/plugins/logging/utils.go @@ -0,0 +1,672 @@ +package logging + +import ( + "encoding/json" + "fmt" + "math" + "slices" + "sort" + "strconv" + "strings" + "time" + + "github.com/dgraph-io/badger/v4" +) + +// storeLogEntry stores a log entry in BadgerDB with optional indexing +func (p *LoggerPlugin) storeLogEntry(entry *LogEntry) error { + p.mu.Lock() + defer p.mu.Unlock() + + // Serialize the log entry + data, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("failed to marshal log entry: %w", err) + } + + return p.db.Update(func(txn *badger.Txn) error { + // Store the main log entry + logKey := LogPrefix + entry.ID + if err := txn.Set([]byte(logKey), data); err != nil { + return err + } + + // Create indexes + if err := p.createIndexes(txn, entry); err != nil { + return err + } + + return nil + }) +} + +// createIndexes creates various indexes for efficient searching +func (p *LoggerPlugin) createIndexes(txn *badger.Txn, entry *LogEntry) error { + timestamp := entry.Timestamp.Unix() + + // Provider index + if entry.Provider != "" { + providerKey := fmt.Sprintf("%s%s%s:%d:%s", IndexPrefix, ProviderIndex, entry.Provider, timestamp, entry.ID) + if err := txn.Set([]byte(providerKey), []byte(entry.ID)); err != nil { + return err + } + } + + // Model index + if entry.Model != "" { + modelKey := fmt.Sprintf("%s%s%s:%d:%s", IndexPrefix, ModelIndex, entry.Model, timestamp, entry.ID) + if err := txn.Set([]byte(modelKey), []byte(entry.ID)); err != nil { + return err + } + } + + // Timestamp index + timestampKey := fmt.Sprintf("%s%s%d:%s", IndexPrefix, TimestampIndex, timestamp, entry.ID) + if err := txn.Set([]byte(timestampKey), []byte(entry.ID)); err != nil { + return err + } + + // Status index + statusKey := fmt.Sprintf("%s%s%s:%d:%s", IndexPrefix, StatusIndex, entry.Status, timestamp, entry.ID) + if err := txn.Set([]byte(statusKey), []byte(entry.ID)); err != nil { + return err + } + + // Latency index (if available) + if entry.Latency != nil { + latencyBucket := getLatencyBucket(*entry.Latency) + latencyKey := fmt.Sprintf("%s%s%d:%d:%s", IndexPrefix, LatencyIndex, latencyBucket, timestamp, entry.ID) + if err := txn.Set([]byte(latencyKey), []byte(entry.ID)); err != nil { + return err + } + } + + // Token count index (if available) + if entry.TokenUsage != nil { + tokenBucket := getTokenBucket(entry.TokenUsage.TotalTokens) + tokenKey := fmt.Sprintf("%s%s%d:%d:%s", IndexPrefix, TokenIndex, tokenBucket, timestamp, entry.ID) + if err := txn.Set([]byte(tokenKey), []byte(entry.ID)); err != nil { + return err + } + } + + return nil +} + +// SearchLogs searches for log entries based on filters and pagination options +func (p *LoggerPlugin) SearchLogs(filters *SearchFilters, pagination *PaginationOptions) (*SearchResult, error) { + var result SearchResult + var successfulRequests int64 + var totalLatency float64 + var logsWithLatency int + var totalTokens int64 + + if pagination == nil { + pagination = &PaginationOptions{ + Limit: 50, + Offset: 0, + SortBy: "timestamp", + Order: "desc", + } + } + + // Initialize result stats + result.Stats.TotalRequests = 0 + result.Stats.SuccessRate = 0 + result.Stats.AverageLatency = 0 + result.Stats.TotalTokens = 0 + result.Pagination = *pagination + + err := p.db.View(func(txn *badger.Txn) error { + // Get matching IDs using indexes + var matchingIDs []string + if filters != nil { + matchingIDs = p.searchWithIndexes(txn, filters) + } else { + matchingIDs = p.searchFullScan(txn) + } + + // Early return if no matches + if len(matchingIDs) == 0 { + result.Stats.TotalRequests = 0 + return nil + } + + // Sort IDs based on pagination options + p.sortIDs(txn, matchingIDs, pagination.SortBy, pagination.Order) + + // Calculate total for stats + result.Stats.TotalRequests = int64(len(matchingIDs)) + + // Apply offset and limit for efficient pagination + start := pagination.Offset + if start >= len(matchingIDs) { + return nil + } + end := min(start+pagination.Limit, len(matchingIDs)) + pageIDs := matchingIDs[start:end] + + // Fetch only the required log entries for the current page + for _, id := range pageIDs { + entry, err := p.getLogEntryByID(txn, id) + if err != nil { + continue + } + + // Verify the entry matches all filters + if p.matchesFilters(entry, filters) { + result.Logs = append(result.Logs, *entry) + + // Update statistics + if entry.Status == "success" { + successfulRequests++ + } + if entry.Latency != nil { + totalLatency += *entry.Latency + logsWithLatency++ + } + if entry.TokenUsage != nil { + totalTokens += int64(entry.TokenUsage.TotalTokens) + } + } + } + + return nil + }) + + if err != nil { + return nil, err + } + + // Calculate final statistics + if result.Stats.TotalRequests > 0 { + result.Stats.SuccessRate = float64(successfulRequests) / float64(result.Stats.TotalRequests) * 100 + } + if logsWithLatency > 0 { + result.Stats.AverageLatency = totalLatency / float64(logsWithLatency) + } + result.Stats.TotalTokens = totalTokens + + return &result, nil +} + +// searchWithIndexes uses indexes to find matching log IDs efficiently +func (p *LoggerPlugin) searchWithIndexes(txn *badger.Txn, filters *SearchFilters) []string { + var candidateIDs []string + var hasFilters bool + + // Start with timestamp range if specified + if filters.StartTime != nil || filters.EndTime != nil { + candidateIDs = p.searchByTimeRange(txn, filters.StartTime, filters.EndTime) + hasFilters = true + } + + // Intersect with other filters + if len(filters.Providers) > 0 { + providerIDs := p.searchByProviders(txn, filters.Providers) + if !hasFilters { + candidateIDs = providerIDs + hasFilters = true + } else { + candidateIDs = p.intersectIDLists(candidateIDs, providerIDs) + } + } + + if len(filters.Models) > 0 { + modelIDs := p.searchByModels(txn, filters.Models) + if !hasFilters { + candidateIDs = modelIDs + hasFilters = true + } else { + candidateIDs = p.intersectIDLists(candidateIDs, modelIDs) + } + } + + if len(filters.Status) > 0 { + statusIDs := p.searchByStatus(txn, filters.Status) + if !hasFilters { + candidateIDs = statusIDs + hasFilters = true + } else { + candidateIDs = p.intersectIDLists(candidateIDs, statusIDs) + } + } + + // If no filters were applied, return all logs + if !hasFilters { + return p.searchFullScan(txn) + } + + return candidateIDs +} + +// searchFullScan performs a full database scan (fallback when indexes are disabled) +func (p *LoggerPlugin) searchFullScan(txn *badger.Txn) []string { + var matchingIDs []string + + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + defer it.Close() + + prefix := []byte(LogPrefix) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + key := string(item.Key()) + id := strings.TrimPrefix(key, LogPrefix) + matchingIDs = append(matchingIDs, id) + } + + return matchingIDs +} + +// Helper methods for index-based searching +func (p *LoggerPlugin) searchByTimeRange(txn *badger.Txn, startTime, endTime *time.Time) []string { + var ids []string + + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + defer it.Close() + + prefix := []byte(IndexPrefix + TimestampIndex) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + key := string(item.Key()) + + // Extract timestamp from key + parts := strings.Split(strings.TrimPrefix(key, IndexPrefix+TimestampIndex), ":") + if len(parts) >= 2 { + if timestamp, err := strconv.ParseInt(parts[0], 10, 64); err == nil { + logTime := time.Unix(timestamp, 0) + if (startTime == nil || logTime.After(*startTime)) && + (endTime == nil || logTime.Before(*endTime)) { + if err := item.Value(func(val []byte) error { + ids = append(ids, string(val)) + return nil + }); err == nil { + // Continue to next item + } + } + } + } + } + + return ids +} + +func (p *LoggerPlugin) searchByProviders(txn *badger.Txn, providers []string) []string { + idMap := make(map[string]bool) + + for _, provider := range providers { + prefix := []byte(IndexPrefix + ProviderIndex + provider + ":") + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + if err := item.Value(func(val []byte) error { + idMap[string(val)] = true + return nil + }); err == nil { + // Continue + } + } + it.Close() + } + + // Convert map to slice + var ids []string + for id := range idMap { + ids = append(ids, id) + } + + return ids +} + +func (p *LoggerPlugin) searchByModels(txn *badger.Txn, models []string) []string { + idMap := make(map[string]bool) + + for _, model := range models { + prefix := []byte(IndexPrefix + ModelIndex + model + ":") + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + if err := item.Value(func(val []byte) error { + idMap[string(val)] = true + return nil + }); err == nil { + // Continue + } + } + it.Close() + } + + // Convert map to slice + var ids []string + for id := range idMap { + ids = append(ids, id) + } + + return ids +} + +func (p *LoggerPlugin) searchByStatus(txn *badger.Txn, statuses []string) []string { + idMap := make(map[string]bool) + + for _, status := range statuses { + prefix := []byte(IndexPrefix + StatusIndex + status + ":") + opts := badger.DefaultIteratorOptions + opts.PrefetchValues = false + it := txn.NewIterator(opts) + + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + if err := item.Value(func(val []byte) error { + idMap[string(val)] = true + return nil + }); err == nil { + // Continue + } + } + it.Close() + } + + // Convert map to slice + var ids []string + for id := range idMap { + ids = append(ids, id) + } + + return ids +} + +// intersectIDLists returns the intersection of two ID lists +func (p *LoggerPlugin) intersectIDLists(list1, list2 []string) []string { + if len(list1) == 0 || len(list2) == 0 { + return []string{} + } + + idMap := make(map[string]bool) + for _, id := range list1 { + idMap[id] = true + } + + var result []string + for _, id := range list2 { + if idMap[id] { + result = append(result, id) + } + } + + return result +} + +// getLogEntryByID retrieves a log entry by ID +func (p *LoggerPlugin) getLogEntryByID(txn *badger.Txn, id string) (*LogEntry, error) { + key := LogPrefix + id + item, err := txn.Get([]byte(key)) + if err != nil { + return nil, err + } + + var entry LogEntry + err = item.Value(func(val []byte) error { + return json.Unmarshal(val, &entry) + }) + + return &entry, err +} + +// matchesFilters checks if a log entry matches the given filters +func (p *LoggerPlugin) matchesFilters(entry *LogEntry, filters *SearchFilters) bool { + if filters == nil { + return true + } + + // Provider filter + if len(filters.Providers) > 0 { + found := slices.Contains(filters.Providers, entry.Provider) + if !found { + return false + } + } + + // Model filter + if len(filters.Models) > 0 { + found := slices.Contains(filters.Models, entry.Model) + if !found { + return false + } + } + + // Status filter + if len(filters.Status) > 0 { + found := slices.Contains(filters.Status, entry.Status) + if !found { + return false + } + } + + // Object type filter + if len(filters.Objects) > 0 { + found := slices.Contains(filters.Objects, entry.Object) + if !found { + return false + } + } + + // Time range filter + if filters.StartTime != nil && entry.Timestamp.Before(*filters.StartTime) { + return false + } + if filters.EndTime != nil && entry.Timestamp.After(*filters.EndTime) { + return false + } + + // Latency filter + if entry.Latency != nil { + if filters.MinLatency != nil && *entry.Latency < *filters.MinLatency { + return false + } + if filters.MaxLatency != nil && *entry.Latency > *filters.MaxLatency { + return false + } + } + + // Token count filter + if entry.TokenUsage != nil { + if filters.MinTokens != nil && entry.TokenUsage.TotalTokens < *filters.MinTokens { + return false + } + if filters.MaxTokens != nil && entry.TokenUsage.TotalTokens > *filters.MaxTokens { + return false + } + } + + // Content search + if filters.ContentSearch != "" { + searchTerm := strings.ToLower(filters.ContentSearch) + found := false + + // Search in input history + for _, msg := range entry.InputHistory { + if msg.Content.ContentStr != nil && + strings.Contains(strings.ToLower(*msg.Content.ContentStr), searchTerm) { + found = true + break + } + } + + // Search in input text + if !found && entry.InputText != nil && + strings.Contains(strings.ToLower(*entry.InputText), searchTerm) { + found = true + } + + // Search in output message + if !found && entry.OutputMessage != nil && entry.OutputMessage.Content.ContentStr != nil && + strings.Contains(strings.ToLower(*entry.OutputMessage.Content.ContentStr), searchTerm) { + found = true + } + + if !found { + return false + } + } + + return true +} + +// sortLogs sorts log entries based on the specified criteria +func (p *LoggerPlugin) sortLogs(logs []LogEntry, sortBy, order string) { + sort.Slice(logs, func(i, j int) bool { + var less bool + + switch sortBy { + case "latency": + latencyI := float64(0) + latencyJ := float64(0) + if logs[i].Latency != nil { + latencyI = *logs[i].Latency + } + if logs[j].Latency != nil { + latencyJ = *logs[j].Latency + } + less = latencyI < latencyJ + case "tokens": + tokensI := 0 + tokensJ := 0 + if logs[i].TokenUsage != nil { + tokensI = logs[i].TokenUsage.TotalTokens + } + if logs[j].TokenUsage != nil { + tokensJ = logs[j].TokenUsage.TotalTokens + } + less = tokensI < tokensJ + default: // timestamp + less = logs[i].Timestamp.Before(logs[j].Timestamp) + } + + if order == "desc" { + return !less + } + return less + }) +} + +// sortIDs sorts log IDs based on the specified criteria +func (p *LoggerPlugin) sortIDs(txn *badger.Txn, ids []string, sortBy, order string) { + // Create a map to cache values for sorting + cache := make(map[string]interface{}) + + // Helper function to get cached value + getValue := func(id string) interface{} { + if val, ok := cache[id]; ok { + return val + } + + entry, err := p.getLogEntryByID(txn, id) + if err != nil { + return nil + } + + var value interface{} + switch sortBy { + case "timestamp": + value = entry.Timestamp.Unix() + case "latency": + if entry.Latency != nil { + value = *entry.Latency + } + case "tokens": + if entry.TokenUsage != nil { + value = entry.TokenUsage.TotalTokens + } + } + + cache[id] = value + return value + } + + // Sort the IDs + sort.Slice(ids, func(i, j int) bool { + a := getValue(ids[i]) + b := getValue(ids[j]) + + // Handle nil values + if a == nil { + return order == "desc" + } + if b == nil { + return order == "asc" + } + + // Compare based on type + switch v := a.(type) { + case int64: + if order == "asc" { + return v < b.(int64) + } + return v > b.(int64) + case float64: + if order == "asc" { + return v < b.(float64) + } + return v > b.(float64) + case int: + if order == "asc" { + return v < b.(int) + } + return v > b.(int) + } + + return false + }) +} + +// getLatencyBucket returns the logarithmic bucket (base 10) for a latency value +func getLatencyBucket(latency float64) int { + if latency <= 0 { + return 0 + } + // Use floor(log10(latency)) to get the exponent, then 10^exponent for the bucket + // This creates buckets like: 0-1ms, 1-10ms, 10-100ms, 100-1000ms, etc. + exponent := math.Floor(math.Log10(latency)) + return int(math.Pow(10, exponent)) +} + +// getTokenBucket returns the power-of-2 bucket for a token count +func getTokenBucket(tokens int) int { + if tokens <= 0 { + return 0 + } + // Use floor(log2(tokens)) to get the exponent, then 2^exponent for the bucket + // This creates buckets like: 0-1, 1-2, 2-4, 4-8, 8-16, 16-32, etc. + exponent := math.Floor(math.Log2(float64(tokens))) + return int(math.Pow(2, exponent)) +} + +// LogManager defines the main interface that combines all logging functionality +type LogManager interface { + // Search searches for log entries based on filters and pagination + Search(filters *SearchFilters, pagination *PaginationOptions) (*SearchResult, error) +} + +type PluginLogManager struct { + plugin *LoggerPlugin +} + +func (p *PluginLogManager) Search(filters *SearchFilters, pagination *PaginationOptions) (*SearchResult, error) { + return p.plugin.SearchLogs(filters, pagination) +} + +func (p *LoggerPlugin) GetPluginLogManager() *PluginLogManager { + return &PluginLogManager{ + plugin: p, + } +} diff --git a/transports/bifrost-http/tracking/docker-compose.yml b/transports/bifrost-http/plugins/telemetry/docker-compose.yml similarity index 100% rename from transports/bifrost-http/tracking/docker-compose.yml rename to transports/bifrost-http/plugins/telemetry/docker-compose.yml diff --git a/transports/bifrost-http/tracking/plugin.go b/transports/bifrost-http/plugins/telemetry/main.go similarity index 95% rename from transports/bifrost-http/tracking/plugin.go rename to transports/bifrost-http/plugins/telemetry/main.go index 730387b817..77383e452f 100644 --- a/transports/bifrost-http/tracking/plugin.go +++ b/transports/bifrost-http/plugins/telemetry/main.go @@ -1,7 +1,7 @@ -// Package tracking provides Prometheus metrics collection and monitoring functionality +// Package telemetry provides Prometheus metrics collection and monitoring functionality // for the Bifrost HTTP service. It includes middleware for HTTP request tracking // and a plugin for tracking upstream provider metrics. -package tracking +package telemetry import ( "context" @@ -13,6 +13,10 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +const ( + PluginName = "bifrost-http-telemetry" +) + // Define context key type for storing start time type contextKey string @@ -43,7 +47,7 @@ func NewPrometheusPlugin() *PrometheusPlugin { // GetName returns the name of the plugin. func (p *PrometheusPlugin) GetName() string { - return "bifrost-http-prometheus" + return PluginName } // PreHook records the start time of the request in the context. diff --git a/transports/bifrost-http/tracking/prometheus.yml b/transports/bifrost-http/plugins/telemetry/prometheus.yml similarity index 100% rename from transports/bifrost-http/tracking/prometheus.yml rename to transports/bifrost-http/plugins/telemetry/prometheus.yml diff --git a/transports/bifrost-http/tracking/setup.go b/transports/bifrost-http/plugins/telemetry/setup.go similarity index 98% rename from transports/bifrost-http/tracking/setup.go rename to transports/bifrost-http/plugins/telemetry/setup.go index c65499981e..2dd85bb162 100644 --- a/transports/bifrost-http/tracking/setup.go +++ b/transports/bifrost-http/plugins/telemetry/setup.go @@ -1,7 +1,7 @@ -// Package tracking provides Prometheus metrics collection and monitoring functionality +// Package telemetry provides Prometheus metrics collection and monitoring functionality // for the Bifrost HTTP service. This file contains the setup and configuration // for Prometheus metrics collection, including HTTP middleware and metric definitions. -package tracking +package telemetry import ( "log" diff --git a/transports/go.mod b/transports/go.mod index 90b1a2245d..68d090bba6 100644 --- a/transports/go.mod +++ b/transports/go.mod @@ -3,7 +3,10 @@ module github.com/maximhq/bifrost/transports go 1.24.1 require ( + github.com/dgraph-io/badger/v4 v4.7.0 github.com/fasthttp/router v1.5.4 + github.com/fasthttp/websocket v1.5.12 + github.com/google/uuid v1.6.0 github.com/maximhq/bifrost/core v1.1.6 github.com/maximhq/bifrost/plugins/maxim v1.0.6 github.com/prometheus/client_golang v1.22.0 @@ -33,13 +36,15 @@ require ( github.com/aws/smithy-go v1.22.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgraph-io/ristretto/v2 v2.2.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/goccy/go-json v0.10.5 // indirect + github.com/google/flatbuffers v25.2.10+incompatible // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/s2a-go v0.1.9 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/gorilla/websocket v1.5.3 // indirect diff --git a/transports/go.sum b/transports/go.sum index a933cab410..3c39a74e4e 100644 --- a/transports/go.sum +++ b/transports/go.sum @@ -38,8 +38,18 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger/v4 v4.7.0 h1:Q+J8HApYAY7UMpL8d9owqiB+odzEc0zn/aqOD9jhc6Y= +github.com/dgraph-io/badger/v4 v4.7.0/go.mod h1:He7TzG3YBy3j4f5baj5B7Zl2XyfNe5bl4Udl0aPemVA= +github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM= +github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38= +github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fasthttp/router v1.5.4 h1:oxdThbBwQgsDIYZ3wR1IavsNl6ZS9WdjKukeMikOnC8= github.com/fasthttp/router v1.5.4/go.mod h1:3/hysWq6cky7dTfzaaEPZGdptwjwx0qzTgFCKEWRjgc= +github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOUInlE= +github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -53,6 +63,8 @@ github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/flatbuffers v25.2.10+incompatible h1:F3vclr7C3HpB1k9mxCGRMXq6FdUalZ6H/pNX4FP1v0Q= +github.com/google/flatbuffers v25.2.10+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=