diff --git a/.gitignore b/.gitignore index 9a7cb7bb23..8a6c0cd095 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ **/__pycache__/** private.* .venv +bifrost-data # Temporary directories **/temp/ diff --git a/docs/quickstart/http-transport.md b/docs/quickstart/http-transport.md index 3b90a64727..8c64cc59cf 100644 --- a/docs/quickstart/http-transport.md +++ b/docs/quickstart/http-transport.md @@ -69,6 +69,11 @@ docker run -p 8080:8080 \ npx @maximhq/bifrost -port 8080 ``` +> **๐Ÿ”„ Smart Configuration Loading**: Bifrost intelligently manages configuration sources: +> - **If `config.json` exists**: Checks if the file has changed. If unchanged, loads from database (fast path). If changed, uses file as source of truth and syncs to database. +> - **Without `config.json`**: Loads configuration from database only. +> - **Web UI changes**: Always update the database, making it the source of truth for subsequent loads. + --- ## ๐Ÿ“ Understanding App Directory & Docker Volumes @@ -92,6 +97,7 @@ npx @maximhq/bifrost -port 8080 - `config.json` - Configuration file (if using file-based config) - `logs/` - Database logs and request history +- Database files - Configuration data and hash tracking - Any other persistent data ### **How Docker Volumes Work with App Directory** @@ -114,8 +120,9 @@ docker run -p 8080:8080 maximhq/bifrost | Scenario | Command | Result | | ---------------------------- | ------------------------------------------------------------- | --------------------------------------- | | **Ephemeral (testing)** | `docker run -p 8080:8080 maximhq/bifrost` | No persistence, configure via web UI | -| **Persistent (recommended)** | `docker run -p 8080:8080 -v $(pwd):/app/data maximhq/bifrost` | Saves config & logs to host directory | +| **Persistent (recommended)** | `docker run -p 8080:8080 -v $(pwd):/app/data maximhq/bifrost` | Saves config, logs & DB to host directory | | **Pre-configured** | Create `config.json`, then run with volume | Starts with your existing configuration | +| **Web UI configured** | Configure via web UI, then restart | Database becomes source of truth | ### **Best Practices** @@ -123,6 +130,7 @@ docker run -p 8080:8080 maximhq/bifrost - **๐Ÿš€ Production**: Mount dedicated volume for data persistence - **๐Ÿงช Testing**: Run without volume for clean ephemeral instances - **๐Ÿ‘ฅ Teams**: Share `config.json` in version control, mount directory with volume +- **โš ๏ธ Important**: After configuring via web UI, your `config.json` may become outdated. The database becomes the source of truth once you make changes through the UI. ### 3. Test the API @@ -142,6 +150,34 @@ curl -X POST http://localhost:8080/v1/chat/completions \ --- +## ๐Ÿ”„ Configuration Loading Behavior + +Bifrost intelligently manages configuration sources to ensure your settings are always up-to-date: + +### **When `config.json` exists:** +1. **File unchanged**: Loads from database (fast path) +2. **File modified**: Uses `config.json` as source of truth, syncs to database +3. **First time**: Uses `config.json` as source of truth, syncs to database + +### **When no `config.json` exists:** + +- Loads configuration from database only +- If database is empty, starts with default configuration + +### **Web UI Configuration:** + +- All changes made via web UI update the database +- Database becomes the source of truth for subsequent loads +- Your `config.json` may become outdated if you configure via web UI + +### **Important Notes:** + +- **Database is always the source of truth** after web UI changes +- **File changes take precedence** over database when file is modified +- **No data loss**: Configuration is always preserved in database + +--- + ## ๐Ÿ”„ Drop-in Integrations (Zero Code Changes!) **Already using OpenAI, Anthropic, or Google GenAI?** Get instant benefits with **zero code changes**: @@ -319,7 +355,7 @@ response, err := http.Post( | **Docker** | No Go installation needed, isolated environment | Production, CI/CD, quick testing | | **Binary** | Direct execution, easier debugging | Development, custom builds | -**Note:** When using file-based config, Bifrost only looks for `config.json` in your specified app directory. + **Note:** When using file-based config, Bifrost only looks for `config.json` in your specified app directory. The database tracks file changes to optimize loading performance. --- diff --git a/docs/usage/http-transport/configuration/providers.md b/docs/usage/http-transport/configuration/providers.md index 3f3687755f..6dc7734e6b 100644 --- a/docs/usage/http-transport/configuration/providers.md +++ b/docs/usage/http-transport/configuration/providers.md @@ -6,10 +6,26 @@ Complete guide to configuring AI providers in Bifrost HTTP transport through `co --- -## ๐Ÿ“‹ Configuration Overview (File Based) +## ๐Ÿ“‹ Configuration Overview > You can directly use the UI (`http://localhost:{port}/providers`) to configure the providers. +Provider configuration can be managed through: + +- **`config.json` file** - File-based configuration with intelligent loading +- **Web UI** - Visual configuration interface +- **Database** - Persistent storage with automatic synchronization + +### **Configuration Loading Behavior** + +Bifrost intelligently manages configuration sources: + +- **If `config.json` exists**: Checks if the file has changed. If unchanged, loads from database (fast path). If changed, uses file as source of truth and syncs to database. +- **If no `config.json`**: Loads configuration from database only. +- **Web UI changes**: Always update the database, making it the source of truth for subsequent loads. + +> **โš ๏ธ Important**: After configuring via web UI, your `config.json` may become outdated. The database becomes the source of truth once you make changes through the UI. + Provider configuration in `config.json` defines: - **API credentials** and key management @@ -485,6 +501,16 @@ export MISTRAL_API_KEY="your-mistral-key" ### **Docker Environment** ```bash +# With persistent configuration +docker run -p 8080:8080 \ + -v $(pwd):/app/data \ + -e OPENAI_API_KEY \ + -e ANTHROPIC_API_KEY \ + -e BEDROCK_API_KEY \ + -e AWS_SECRET_ACCESS_KEY \ + maximhq/bifrost + +# Legacy: Direct config.json mount docker run -p 8080:8080 \ -v $(pwd)/config.json:/app/config/config.json \ -e OPENAI_API_KEY \ @@ -494,6 +520,8 @@ docker run -p 8080:8080 \ maximhq/bifrost ``` +> **๐Ÿ’ก Note**: The recommended approach uses `-v $(pwd):/app/data` to persist both the config file and database. This ensures configuration changes via web UI are preserved between container restarts. + --- ## ๐Ÿงช Testing Configuration diff --git a/docs/usage/http-transport/integrations/README.md b/docs/usage/http-transport/integrations/README.md index ed358b70d0..a8f88a1a63 100644 --- a/docs/usage/http-transport/integrations/README.md +++ b/docs/usage/http-transport/integrations/README.md @@ -265,7 +265,7 @@ services: ports: - "8080:8080" volumes: - - ./config.json:/app/config/config.json + - ./data:/app/data # Recommended: persist both config and database environment: - OPENAI_API_KEY - ANTHROPIC_API_KEY diff --git a/plugins/maxim/README.md b/plugins/maxim/README.md index 0563e8d9db..c1279e2103 100644 --- a/plugins/maxim/README.md +++ b/plugins/maxim/README.md @@ -50,10 +50,12 @@ This plugin integrates the Maxim SDK into Bifrost, enabling seamless observabili Running the docker container + > **๐Ÿ’ก Volume Mounting**: The entire working directory is mounted to `/app/data` to persist both the JSON configuration file and the database. This ensures that configuration changes made via the web UI are preserved between container restarts, and the new hash-based configuration loading system can properly track file changes. + ```bash docker run -d \ -p 8080:8080 \ - -v $(pwd)/config.json:/app/config/config.json \ + -v $(pwd):/app/data \ -e APP_PORT=8080 \ -e MAXIM_API_KEY \ -e MAXIM_LOG_REPO_ID \ diff --git a/transports/bifrost-http/handlers/config.go b/transports/bifrost-http/handlers/config.go index 8589382d7e..5cf00b7b73 100644 --- a/transports/bifrost-http/handlers/config.go +++ b/transports/bifrost-http/handlers/config.go @@ -13,22 +13,20 @@ import ( ) // ConfigHandler manages runtime configuration updates for Bifrost. -// It provides an endpoint to hot-reload settings from the configuration file. +// It provides endpoints to update and retrieve settings persisted via the ConfigStore backed by sql database. type ConfigHandler struct { - client *bifrost.Bifrost - logger schemas.Logger - store *lib.ConfigStore - configPath string + client *bifrost.Bifrost + logger schemas.Logger + store *lib.ConfigStore } // NewConfigHandler creates a new handler for configuration management. -// It requires the Bifrost client, a logger, and the path to the config file to be reloaded. -func NewConfigHandler(client *bifrost.Bifrost, logger schemas.Logger, store *lib.ConfigStore, configPath string) *ConfigHandler { +// It requires the Bifrost client, a logger, and the config store. +func NewConfigHandler(client *bifrost.Bifrost, logger schemas.Logger, store *lib.ConfigStore) *ConfigHandler { return &ConfigHandler{ - client: client, - logger: logger, - store: store, - configPath: configPath, + client: client, + logger: logger, + store: store, } } diff --git a/transports/bifrost-http/handlers/websocket.go b/transports/bifrost-http/handlers/websocket.go index 5aee37aae4..6efa51a2c0 100644 --- a/transports/bifrost-http/handlers/websocket.go +++ b/transports/bifrost-http/handlers/websocket.go @@ -17,11 +17,17 @@ import ( "github.com/valyala/fasthttp" ) +// WebSocketClient represents a connected WebSocket client with its own mutex +type WebSocketClient struct { + conn *websocket.Conn + mu sync.Mutex // Per-connection mutex for thread-safe writes +} + // WebSocketHandler manages WebSocket connections for real-time updates type WebSocketHandler struct { logManager logging.LogManager logger schemas.Logger - clients map[*websocket.Conn]bool + clients map[*websocket.Conn]*WebSocketClient mu sync.RWMutex stopChan chan struct{} // Channel to signal heartbeat goroutine to stop done chan struct{} // Channel to signal when heartbeat goroutine has stopped @@ -32,7 +38,7 @@ func NewWebSocketHandler(logManager logging.LogManager, logger schemas.Logger) * return &WebSocketHandler{ logManager: logManager, logger: logger, - clients: make(map[*websocket.Conn]bool), + clients: make(map[*websocket.Conn]*WebSocketClient), stopChan: make(chan struct{}), done: make(chan struct{}), } @@ -83,9 +89,14 @@ func isLocalhost(host string) bool { // HandleLogStream handles WebSocket connections for real-time log streaming func (h *WebSocketHandler) HandleLogStream(ctx *fasthttp.RequestCtx) { err := upgrader.Upgrade(ctx, func(ws *websocket.Conn) { + // Create a new client with its own mutex + client := &WebSocketClient{ + conn: ws, + } + // Register new client h.mu.Lock() - h.clients[ws] = true + h.clients[ws] = client h.mu.Unlock() // Clean up on disconnect @@ -123,8 +134,37 @@ func (h *WebSocketHandler) HandleLogStream(ctx *fasthttp.RequestCtx) { } } +// sendMessageSafely sends a message to a client with proper locking and error handling +func (h *WebSocketHandler) sendMessageSafely(client *WebSocketClient, messageType int, data []byte) error { + client.mu.Lock() + defer client.mu.Unlock() + + // Set a write deadline to prevent hanging connections + client.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) + defer client.conn.SetWriteDeadline(time.Time{}) // Clear the deadline + + err := client.conn.WriteMessage(messageType, data) + if err != nil { + // Remove the client from the map if write fails + go func() { + h.mu.Lock() + delete(h.clients, client.conn) + h.mu.Unlock() + client.conn.Close() + }() + } + return err +} + // BroadcastLogUpdate sends a log update to all connected WebSocket clients func (h *WebSocketHandler) BroadcastLogUpdate(logEntry *logging.LogEntry) { + // Add panic recovery to prevent server crashes + defer func() { + if r := recover(); r != nil { + h.logger.Error(fmt.Errorf("panic in BroadcastLogUpdate: %v", r)) + } + }() + // Determine operation type based on log status and timestamp operationType := "update" if logEntry.Status == "processing" && logEntry.CreatedAt.Equal(logEntry.Timestamp) { @@ -147,14 +187,18 @@ func (h *WebSocketHandler) BroadcastLogUpdate(logEntry *logging.LogEntry) { return } + // Get a snapshot of clients to avoid holding the lock during writes h.mu.RLock() - defer h.mu.RUnlock() + clients := make([]*WebSocketClient, 0, len(h.clients)) + for _, client := range h.clients { + clients = append(clients, client) + } + h.mu.RUnlock() - for client := range h.clients { - err := client.WriteMessage(websocket.TextMessage, data) - if err != nil { + // Send message to each client safely + for _, client := range clients { + if err := h.sendMessageSafely(client, websocket.TextMessage, data); err != nil { h.logger.Error(fmt.Errorf("failed to send message to client: %v", err)) - continue } } } @@ -171,14 +215,20 @@ func (h *WebSocketHandler) StartHeartbeat() { for { select { case <-ticker.C: + // Get a snapshot of clients to avoid holding the lock during writes h.mu.RLock() - for client := range h.clients { - err := client.WriteMessage(websocket.PingMessage, nil) - if err != nil { + clients := make([]*WebSocketClient, 0, len(h.clients)) + for _, client := range h.clients { + clients = append(clients, client) + } + h.mu.RUnlock() + + // Send heartbeat to each client safely + for _, client := range clients { + if err := h.sendMessageSafely(client, websocket.PingMessage, nil); err != nil { h.logger.Error(fmt.Errorf("failed to send heartbeat: %v", err)) } } - h.mu.RUnlock() case <-h.stopChan: return } @@ -193,9 +243,9 @@ func (h *WebSocketHandler) Stop() { // Close all client connections h.mu.Lock() - for client := range h.clients { - client.Close() + for _, client := range h.clients { + client.conn.Close() } - h.clients = make(map[*websocket.Conn]bool) + h.clients = make(map[*websocket.Conn]*WebSocketClient) h.mu.Unlock() } diff --git a/transports/bifrost-http/lib/models.go b/transports/bifrost-http/lib/models.go new file mode 100644 index 0000000000..e2730cee72 --- /dev/null +++ b/transports/bifrost-http/lib/models.go @@ -0,0 +1,338 @@ +// Package lib provides GORM model definitions for Bifrost configuration storage +package lib + +import ( + "encoding/json" + "time" + + "github.com/maximhq/bifrost/core/schemas" + "github.com/maximhq/bifrost/core/schemas/meta" + "gorm.io/gorm" +) + +type DBConfigHash struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + Hash string `gorm:"type:varchar(255);uniqueIndex;not null" json:"hash"` + CreatedAt time.Time `gorm:"index;not null" json:"created_at"` + UpdatedAt time.Time `gorm:"index;not null" json:"updated_at"` +} + +// DBProvider represents a provider configuration in the database +type DBProvider struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + Name string `gorm:"type:varchar(50);uniqueIndex;not null" json:"name"` // ModelProvider as string + NetworkConfigJSON string `gorm:"type:text" json:"-"` // JSON serialized schemas.NetworkConfig + ConcurrencyBufferJSON string `gorm:"type:text" json:"-"` // JSON serialized schemas.ConcurrencyAndBufferSize + MetaConfigJSON string `gorm:"type:text" json:"-"` // JSON serialized schemas.MetaConfig + MetaConfigType string `gorm:"type:varchar(20)" json:"-"` // Type of meta config ("bedrock", etc.) + CreatedAt time.Time `gorm:"index;not null" json:"created_at"` + UpdatedAt time.Time `gorm:"index;not null" json:"updated_at"` + + // Relationships + Keys []DBKey `gorm:"foreignKey:ProviderID;constraint:OnDelete:CASCADE" json:"keys"` + + // Virtual fields for runtime use (not stored in DB) + NetworkConfig *schemas.NetworkConfig `gorm:"-" json:"network_config,omitempty"` + ConcurrencyAndBufferSize *schemas.ConcurrencyAndBufferSize `gorm:"-" json:"concurrency_and_buffer_size,omitempty"` + MetaConfig *schemas.MetaConfig `gorm:"-" json:"meta_config,omitempty"` +} + +// DBKey represents an API key configuration in the database +type DBKey struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + ProviderID uint `gorm:"index;not null" json:"provider_id"` + KeyID string `gorm:"type:varchar(255);index;not null" json:"key_id"` // UUID from schemas.Key + Value string `gorm:"type:text;not null" json:"value"` + ModelsJSON string `gorm:"type:text" json:"-"` // JSON serialized []string + Weight float64 `gorm:"default:1.0" json:"weight"` + CreatedAt time.Time `gorm:"index;not null" json:"created_at"` + UpdatedAt time.Time `gorm:"index;not null" json:"updated_at"` + + // Azure config fields (embedded instead of separate table for simplicity) + AzureEndpoint *string `gorm:"type:text" json:"azure_endpoint,omitempty"` + AzureAPIVersion *string `gorm:"type:varchar(50)" json:"azure_api_version,omitempty"` + AzureDeploymentsJSON *string `gorm:"type:text" json:"-"` // JSON serialized map[string]string + + // Vertex config fields (embedded) + VertexProjectID *string `gorm:"type:varchar(255)" json:"vertex_project_id,omitempty"` + VertexRegion *string `gorm:"type:varchar(100)" json:"vertex_region,omitempty"` + VertexAuthCredentials *string `gorm:"type:text" json:"vertex_auth_credentials,omitempty"` + + // Virtual fields for runtime use (not stored in DB) + Models []string `gorm:"-" json:"models"` + AzureKeyConfig *schemas.AzureKeyConfig `gorm:"-" json:"azure_key_config,omitempty"` + VertexKeyConfig *schemas.VertexKeyConfig `gorm:"-" json:"vertex_key_config,omitempty"` +} + +// DBMCPClient represents an MCP client configuration in the database +type DBMCPClient struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + Name string `gorm:"type:varchar(255);uniqueIndex;not null" json:"name"` + ConnectionType string `gorm:"type:varchar(20);not null" json:"connection_type"` // schemas.MCPConnectionType + ConnectionString *string `gorm:"type:text" json:"connection_string,omitempty"` + StdioConfigJSON *string `gorm:"type:text" json:"-"` // JSON serialized schemas.MCPStdioConfig + ToolsToExecuteJSON string `gorm:"type:text" json:"-"` // JSON serialized []string + ToolsToSkipJSON string `gorm:"type:text" json:"-"` // JSON serialized []string + CreatedAt time.Time `gorm:"index;not null" json:"created_at"` + UpdatedAt time.Time `gorm:"index;not null" json:"updated_at"` + + // Virtual fields for runtime use (not stored in DB) + StdioConfig *schemas.MCPStdioConfig `gorm:"-" json:"stdio_config,omitempty"` + ToolsToExecute []string `gorm:"-" json:"tools_to_execute"` + ToolsToSkip []string `gorm:"-" json:"tools_to_skip"` +} + +// DBClientConfig represents global client configuration in the database +type DBClientConfig struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + DropExcessRequests bool `gorm:"default:false" json:"drop_excess_requests"` + PrometheusLabelsJSON string `gorm:"type:text" json:"-"` // JSON serialized []string + InitialPoolSize int `gorm:"default:300" json:"initial_pool_size"` + EnableLogging bool `json:"enable_logging"` + CreatedAt time.Time `gorm:"index;not null" json:"created_at"` + UpdatedAt time.Time `gorm:"index;not null" json:"updated_at"` + + // Virtual fields for runtime use (not stored in DB) + PrometheusLabels []string `gorm:"-" json:"prometheus_labels"` +} + +// DBEnvKey represents environment variable tracking in the database +type DBEnvKey struct { + ID uint `gorm:"primaryKey;autoIncrement" json:"id"` + EnvVar string `gorm:"type:varchar(255);index;not null" json:"env_var"` + Provider string `gorm:"type:varchar(50);index" json:"provider"` // Empty for MCP/client configs + KeyType string `gorm:"type:varchar(50);not null" json:"key_type"` // "api_key", "azure_config", "vertex_config", "meta_config", "connection_string" + ConfigPath string `gorm:"type:varchar(500);not null" json:"config_path"` // Descriptive path of where this env var is used + KeyID string `gorm:"type:varchar(255);index" json:"key_id"` // Key UUID (empty for non-key configs) + CreatedAt time.Time `gorm:"index;not null" json:"created_at"` +} + +// TableName sets the table name for each model +func (DBConfigHash) TableName() string { return "config_hashes" } +func (DBProvider) TableName() string { return "config_providers" } +func (DBKey) TableName() string { return "config_keys" } +func (DBMCPClient) TableName() string { return "config_mcp_clients" } +func (DBClientConfig) TableName() string { return "config_client" } +func (DBEnvKey) TableName() string { return "config_env_keys" } + +// GORM Hooks for JSON serialization/deserialization + +// BeforeSave hooks for serialization +func (p *DBProvider) BeforeSave(tx *gorm.DB) error { + if p.NetworkConfig != nil { + data, err := json.Marshal(p.NetworkConfig) + if err != nil { + return err + } + p.NetworkConfigJSON = string(data) + } + + if p.ConcurrencyAndBufferSize != nil { + data, err := json.Marshal(p.ConcurrencyAndBufferSize) + if err != nil { + return err + } + p.ConcurrencyBufferJSON = string(data) + } + + if p.MetaConfig != nil { + data, err := json.Marshal(*p.MetaConfig) + if err != nil { + return err + } + p.MetaConfigJSON = string(data) + + // Set meta config type for proper deserialization + switch (*p.MetaConfig).(type) { + case *meta.BedrockMetaConfig: + p.MetaConfigType = "bedrock" + default: + + } + } + + return nil +} + +func (k *DBKey) BeforeSave(tx *gorm.DB) error { + if k.Models != nil { + data, err := json.Marshal(k.Models) + if err != nil { + return err + } + k.ModelsJSON = string(data) + } + + if k.AzureKeyConfig != nil && k.AzureKeyConfig.Deployments != nil { + data, err := json.Marshal(k.AzureKeyConfig.Deployments) + if err != nil { + return err + } + deployments := string(data) + k.AzureDeploymentsJSON = &deployments + } + + return nil +} + +func (c *DBMCPClient) BeforeSave(tx *gorm.DB) error { + if c.StdioConfig != nil { + data, err := json.Marshal(c.StdioConfig) + if err != nil { + return err + } + config := string(data) + c.StdioConfigJSON = &config + } + + if c.ToolsToExecute != nil { + data, err := json.Marshal(c.ToolsToExecute) + if err != nil { + return err + } + c.ToolsToExecuteJSON = string(data) + } else { + c.ToolsToExecuteJSON = "[]" + } + + if c.ToolsToSkip != nil { + data, err := json.Marshal(c.ToolsToSkip) + if err != nil { + return err + } + c.ToolsToSkipJSON = string(data) + } else { + c.ToolsToSkipJSON = "[]" + } + + return nil +} + +func (cc *DBClientConfig) BeforeSave(tx *gorm.DB) error { + if cc.PrometheusLabels != nil { + data, err := json.Marshal(cc.PrometheusLabels) + if err != nil { + return err + } + cc.PrometheusLabelsJSON = string(data) + } + + return nil +} + +// AfterFind hooks for deserialization +func (p *DBProvider) AfterFind(tx *gorm.DB) error { + if p.NetworkConfigJSON != "" { + var config schemas.NetworkConfig + if err := json.Unmarshal([]byte(p.NetworkConfigJSON), &config); err != nil { + return err + } + p.NetworkConfig = &config + } + + if p.ConcurrencyBufferJSON != "" { + var config schemas.ConcurrencyAndBufferSize + if err := json.Unmarshal([]byte(p.ConcurrencyBufferJSON), &config); err != nil { + return err + } + p.ConcurrencyAndBufferSize = &config + } + + if p.MetaConfigJSON != "" { + var metaConfig schemas.MetaConfig + + switch p.MetaConfigType { + case "bedrock": + var bedrockConfig meta.BedrockMetaConfig + if err := json.Unmarshal([]byte(p.MetaConfigJSON), &bedrockConfig); err != nil { + return err + } + metaConfig = &bedrockConfig + default: + // Unknown meta config type, skip + return nil + } + + p.MetaConfig = &metaConfig + } + + return nil +} + +func (k *DBKey) AfterFind(tx *gorm.DB) error { + if k.ModelsJSON != "" { + if err := json.Unmarshal([]byte(k.ModelsJSON), &k.Models); err != nil { + return err + } + } + + // Reconstruct Azure config if fields are present + if k.AzureEndpoint != nil { + azureConfig := &schemas.AzureKeyConfig{ + Endpoint: *k.AzureEndpoint, + APIVersion: k.AzureAPIVersion, + } + + if k.AzureDeploymentsJSON != nil { + var deployments map[string]string + if err := json.Unmarshal([]byte(*k.AzureDeploymentsJSON), &deployments); err != nil { + return err + } + azureConfig.Deployments = deployments + } + + k.AzureKeyConfig = azureConfig + } + + // Reconstruct Vertex config if fields are present + if k.VertexProjectID != nil { + config := &schemas.VertexKeyConfig{ + ProjectID: *k.VertexProjectID, + } + + if k.VertexRegion != nil { + config.Region = *k.VertexRegion + } + if k.VertexAuthCredentials != nil { + config.AuthCredentials = *k.VertexAuthCredentials + } + + k.VertexKeyConfig = config + } + + return nil +} + +func (c *DBMCPClient) AfterFind(tx *gorm.DB) error { + if c.StdioConfigJSON != nil { + var config schemas.MCPStdioConfig + if err := json.Unmarshal([]byte(*c.StdioConfigJSON), &config); err != nil { + return err + } + c.StdioConfig = &config + } + + if c.ToolsToExecuteJSON != "" { + if err := json.Unmarshal([]byte(c.ToolsToExecuteJSON), &c.ToolsToExecute); err != nil { + return err + } + } + + if c.ToolsToSkipJSON != "" { + if err := json.Unmarshal([]byte(c.ToolsToSkipJSON), &c.ToolsToSkip); err != nil { + return err + } + } + + return nil +} + +func (cc *DBClientConfig) AfterFind(tx *gorm.DB) error { + if cc.PrometheusLabelsJSON != "" { + if err := json.Unmarshal([]byte(cc.PrometheusLabelsJSON), &cc.PrometheusLabels); err != nil { + return err + } + } + + return nil +} diff --git a/transports/bifrost-http/lib/store.go b/transports/bifrost-http/lib/store.go index 6ca88f88bc..4200dd247c 100644 --- a/transports/bifrost-http/lib/store.go +++ b/transports/bifrost-http/lib/store.go @@ -3,35 +3,40 @@ package lib import ( + "crypto/sha256" + "encoding/hex" "encoding/json" + "errors" "fmt" "os" "strings" "sync" + "time" "github.com/google/uuid" bifrost "github.com/maximhq/bifrost/core" "github.com/maximhq/bifrost/core/schemas" "github.com/maximhq/bifrost/core/schemas/meta" + "gorm.io/gorm" ) // ConfigStore represents a high-performance in-memory configuration store for Bifrost. -// It provides thread-safe access to provider configurations with the ability to -// persist changes back to the original JSON configuration file. +// It provides thread-safe access to provider configurations with database persistence. // // Features: // - Pure in-memory storage for ultra-fast access // - Environment variable processing for API keys and key-level configurations // - Thread-safe operations with read-write mutexes // - Real-time configuration updates via HTTP API -// - Explicit persistence control via WriteConfigToFile() +// - Automatic database persistence for all changes // - Support for provider-specific key configurations (Azure, Vertex) and meta configurations (Bedrock) type ConfigStore struct { mu sync.RWMutex muMCP sync.RWMutex logger schemas.Logger - configPath string // Path to the original JSON config file + db *gorm.DB // GORM database connection client *bifrost.Bifrost + configPath string // Path to the config file // In-memory storage ClientConfig ClientConfig @@ -58,13 +63,26 @@ var DefaultClientConfig = ClientConfig{ EnableLogging: true, } -// NewConfigStore creates a new in-memory configuration store instance. -func NewConfigStore(logger schemas.Logger) (*ConfigStore, error) { - return &ConfigStore{ - logger: logger, - Providers: make(map[schemas.ModelProvider]ProviderConfig), - EnvKeys: make(map[string][]EnvKeyInfo), - }, nil +// NewConfigStore creates a new in-memory configuration store instance with database connection. +func NewConfigStore(logger schemas.Logger, db *gorm.DB, configPath string) (*ConfigStore, error) { + if db == nil { + return nil, fmt.Errorf("database connection cannot be nil") + } + + store := &ConfigStore{ + logger: logger, + db: db, + configPath: configPath, + Providers: make(map[schemas.ModelProvider]ProviderConfig), + EnvKeys: make(map[string][]EnvKeyInfo), + } + + // Auto-migrate database tables + if err := store.autoMigrate(); err != nil { + return nil, fmt.Errorf("failed to auto-migrate tables: %w", err) + } + + return store, nil } // LoadFromConfig loads initial configuration from a JSON config file into memory @@ -83,8 +101,6 @@ func NewConfigStore(logger schemas.Logger) (*ConfigStore, error) { // - In-memory storage for ultra-fast access during request processing // - Graceful handling of missing config files func (s *ConfigStore) LoadFromConfig(configPath string) error { - s.mu.Lock() - defer s.mu.Unlock() s.configPath = configPath s.logger.Info(fmt.Sprintf("Loading configuration from: %s", configPath)) @@ -93,18 +109,7 @@ func (s *ConfigStore) LoadFromConfig(configPath string) error { data, err := os.ReadFile(configPath) if err != nil { if os.IsNotExist(err) { - s.logger.Info(fmt.Sprintf("Config file %s not found, starting with default configuration. Providers can be added dynamically via UI.", configPath)) - - // Initialize with default configuration - s.ClientConfig = DefaultClientConfig - s.Providers = make(map[schemas.ModelProvider]ProviderConfig) - s.MCPConfig = nil - - // Auto-detect and configure providers from common environment variables - s.autoDetectProviders() - - s.logger.Info("Successfully initialized with default configuration.") - return nil + return s.loadDefaultConfig() } return fmt.Errorf("failed to read config file: %w", err) } @@ -252,6 +257,186 @@ func (s *ConfigStore) LoadFromConfig(configPath string) error { return nil } +// autoMigrate creates/updates the database tables using GORM +func (s *ConfigStore) autoMigrate() error { + return s.db.AutoMigrate( + &DBConfigHash{}, + &DBProvider{}, + &DBKey{}, + &DBMCPClient{}, + &DBClientConfig{}, + &DBEnvKey{}, + ) +} + +// LoadFromDatabase loads initial configuration from the database into memory +// with full preprocessing including environment variable resolution and key config parsing. +// All processing is done upfront to ensure zero latency when retrieving data. +// +// If no configuration exists in the database, the system starts with default configuration +// and users can add providers dynamically via the HTTP API. +// +// This method handles: +// - Database configuration loading +// - Environment variable substitution for API keys (env.VARIABLE_NAME) +// - Key-level config processing for Azure and Vertex (Endpoint, APIVersion, ProjectID, Region, AuthCredentials) +// - Provider-specific meta config processing (Bedrock only) +// - In-memory storage for ultra-fast access during request processing +// - Auto-detection of providers from environment variables if database is empty +func (s *ConfigStore) LoadFromDatabase() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.logger.Info("Loading configuration from database") + + // Load client configuration + if err := s.loadClientConfigFromDB(); err != nil { + s.logger.Warn(fmt.Sprintf("Failed to load client config from database, using defaults: %v", err)) + s.ClientConfig = DefaultClientConfig + } + + // Load providers configuration + if err := s.loadProvidersFromDB(); err != nil { + s.logger.Warn(fmt.Sprintf("Failed to load providers from database: %v", err)) + // Auto-detect providers if database load fails + s.autoDetectProviders() + } + + // Load MCP configuration + if err := s.loadMCPFromDB(); err != nil { + s.logger.Warn(fmt.Sprintf("Failed to load MCP config from database: %v", err)) + s.MCPConfig = nil + } + + // Load environment variable tracking + if err := s.loadEnvKeysFromDB(); err != nil { + s.logger.Warn(fmt.Sprintf("Failed to load env keys from database: %v", err)) + s.EnvKeys = make(map[string][]EnvKeyInfo) + } + + s.logger.Info("Successfully loaded configuration from database.") + return nil +} + +// loadClientConfigFromDB loads client configuration from database +func (s *ConfigStore) loadClientConfigFromDB() error { + var dbConfig DBClientConfig + if err := s.db.First(&dbConfig).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // No client config in database, use defaults + s.ClientConfig = DefaultClientConfig + return nil + } + return err + } + + s.ClientConfig = ClientConfig{ + DropExcessRequests: dbConfig.DropExcessRequests, + PrometheusLabels: dbConfig.PrometheusLabels, + InitialPoolSize: dbConfig.InitialPoolSize, + EnableLogging: dbConfig.EnableLogging, + } + + return nil +} + +// loadProvidersFromDB loads all providers and their keys from database +func (s *ConfigStore) loadProvidersFromDB() error { + var dbProviders []DBProvider + if err := s.db.Preload("Keys").Find(&dbProviders).Error; err != nil { + return err + } + + if len(dbProviders) == 0 { + // No providers in database, auto-detect from environment + s.autoDetectProviders() + return nil + } + + processedProviders := make(map[schemas.ModelProvider]ProviderConfig) + + for _, dbProvider := range dbProviders { + provider := schemas.ModelProvider(dbProvider.Name) + + // Convert database keys to schemas.Key + keys := make([]schemas.Key, len(dbProvider.Keys)) + for i, dbKey := range dbProvider.Keys { + keys[i] = schemas.Key{ + ID: dbKey.KeyID, + Value: dbKey.Value, + Models: dbKey.Models, + Weight: dbKey.Weight, + AzureKeyConfig: dbKey.AzureKeyConfig, + VertexKeyConfig: dbKey.VertexKeyConfig, + } + } + + providerConfig := ProviderConfig{ + Keys: keys, + NetworkConfig: dbProvider.NetworkConfig, + ConcurrencyAndBufferSize: dbProvider.ConcurrencyAndBufferSize, + MetaConfig: dbProvider.MetaConfig, + } + + processedProviders[provider] = providerConfig + } + + s.Providers = processedProviders + return nil +} + +// loadMCPFromDB loads MCP configuration from database +func (s *ConfigStore) loadMCPFromDB() error { + var dbClients []DBMCPClient + if err := s.db.Find(&dbClients).Error; err != nil { + return err + } + + if len(dbClients) == 0 { + s.MCPConfig = nil + return nil + } + + clientConfigs := make([]schemas.MCPClientConfig, len(dbClients)) + for i, dbClient := range dbClients { + clientConfigs[i] = schemas.MCPClientConfig{ + Name: dbClient.Name, + ConnectionType: schemas.MCPConnectionType(dbClient.ConnectionType), + ConnectionString: dbClient.ConnectionString, + StdioConfig: dbClient.StdioConfig, + ToolsToExecute: dbClient.ToolsToExecute, + ToolsToSkip: dbClient.ToolsToSkip, + } + } + + s.MCPConfig = &schemas.MCPConfig{ + ClientConfigs: clientConfigs, + } + + return nil +} + +// loadEnvKeysFromDB loads environment variable tracking from database +func (s *ConfigStore) loadEnvKeysFromDB() error { + var dbEnvKeys []DBEnvKey + if err := s.db.Find(&dbEnvKeys).Error; err != nil { + return err + } + + s.EnvKeys = make(map[string][]EnvKeyInfo) + for _, dbEnvKey := range dbEnvKeys { + s.EnvKeys[dbEnvKey.EnvVar] = append(s.EnvKeys[dbEnvKey.EnvVar], EnvKeyInfo{ + EnvVar: dbEnvKey.EnvVar, + Provider: dbEnvKey.Provider, + KeyType: dbEnvKey.KeyType, + ConfigPath: dbEnvKey.ConfigPath, + KeyID: dbEnvKey.KeyID, + }) + } + + return nil +} + // processEnvValue checks and replaces environment variable references in configuration values. // Returns the processed value and the environment variable name if it was an env reference. // Supports the "env.VARIABLE_NAME" syntax for referencing environment variables. @@ -275,8 +460,6 @@ func (s *ConfigStore) processEnvValue(value string) (string, string, error) { // in the exact same format that LoadFromConfig expects. This enables persistence // of runtime configuration changes with environment variable references restored. func (s *ConfigStore) writeConfigToFile(configPath string) error { - s.mu.RLock() - defer s.mu.RUnlock() s.logger.Debug(fmt.Sprintf("Writing current configuration to: %s", configPath)) @@ -505,12 +688,172 @@ func (s *ConfigStore) restoreMetaConfigEnvVars(provider schemas.ModelProvider, m } } -// SaveConfig writes the current configuration back to the original config file path +// SaveConfig writes the current configuration back to the database func (s *ConfigStore) SaveConfig() error { - if s.configPath == "" { - return fmt.Errorf("no config path set - use LoadFromConfig first") + // Save client config + if err := s.saveClientConfigToDB(); err != nil { + return fmt.Errorf("failed to save client config: %w", err) + } + + // Save providers + if err := s.saveProvidersToDB(); err != nil { + return fmt.Errorf("failed to save providers: %w", err) + } + + // Save MCP config + if err := s.saveMCPToDB(); err != nil { + return fmt.Errorf("failed to save MCP config: %w", err) + } + + // Save env keys + if err := s.saveEnvKeysToDB(); err != nil { + return fmt.Errorf("failed to save env keys: %w", err) + } + + return nil +} + +// saveClientConfigToDB saves client configuration to database +func (s *ConfigStore) saveClientConfigToDB() error { + dbConfig := DBClientConfig{ + DropExcessRequests: s.ClientConfig.DropExcessRequests, + InitialPoolSize: s.ClientConfig.InitialPoolSize, + EnableLogging: s.ClientConfig.EnableLogging, + PrometheusLabels: s.ClientConfig.PrometheusLabels, + } + + // Delete existing client config and create new one + if err := s.db.Session(&gorm.Session{AllowGlobalUpdate: true}).Delete(&DBClientConfig{}).Error; err != nil { + return err } - return s.writeConfigToFile(s.configPath) + + return s.db.Create(&dbConfig).Error +} + +// saveProvidersToDB saves all providers and their keys to database +func (s *ConfigStore) saveProvidersToDB() error { + // Delete existing providers and keys (cascade will handle keys) + if err := s.db.Session(&gorm.Session{AllowGlobalUpdate: true}).Delete(&DBProvider{}).Error; err != nil { + return err + } + + for providerName, providerConfig := range s.Providers { + dbProvider := DBProvider{ + Name: string(providerName), + NetworkConfig: providerConfig.NetworkConfig, + ConcurrencyAndBufferSize: providerConfig.ConcurrencyAndBufferSize, + MetaConfig: providerConfig.MetaConfig, + } + + // Create provider first + if err := s.db.Create(&dbProvider).Error; err != nil { + return err + } + + // Create keys for this provider + dbKeys := make([]DBKey, 0, len(providerConfig.Keys)) + for _, key := range providerConfig.Keys { + dbKey := DBKey{ + ProviderID: dbProvider.ID, + KeyID: key.ID, + Value: key.Value, + Models: key.Models, + Weight: key.Weight, + AzureKeyConfig: key.AzureKeyConfig, + VertexKeyConfig: key.VertexKeyConfig, + } + + // Handle Azure config + if key.AzureKeyConfig != nil { + dbKey.AzureEndpoint = &key.AzureKeyConfig.Endpoint + dbKey.AzureAPIVersion = key.AzureKeyConfig.APIVersion + } + + // Handle Vertex config + if key.VertexKeyConfig != nil { + dbKey.VertexProjectID = &key.VertexKeyConfig.ProjectID + dbKey.VertexRegion = &key.VertexKeyConfig.Region + dbKey.VertexAuthCredentials = &key.VertexKeyConfig.AuthCredentials + } + + dbKeys = append(dbKeys, dbKey) + } + + if len(dbKeys) > 0 { + if err := s.db.CreateInBatches(dbKeys, 100).Error; err != nil { + return err + } + } + + } + + return nil +} + +// saveMCPToDB saves MCP configuration to database +func (s *ConfigStore) saveMCPToDB() error { + // Delete existing MCP clients + if err := s.db.Session(&gorm.Session{AllowGlobalUpdate: true}).Delete(&DBMCPClient{}).Error; err != nil { + return err + } + + if s.MCPConfig == nil { + return nil + } + + + dbClients := make([]DBMCPClient, 0, len(s.MCPConfig.ClientConfigs)) + for _, clientConfig := range s.MCPConfig.ClientConfigs { + dbClient := DBMCPClient{ + Name: clientConfig.Name, + ConnectionType: string(clientConfig.ConnectionType), + ConnectionString: clientConfig.ConnectionString, + StdioConfig: clientConfig.StdioConfig, + ToolsToExecute: clientConfig.ToolsToExecute, + ToolsToSkip: clientConfig.ToolsToSkip, + } + + dbClients = append(dbClients, dbClient) + } + + if len(dbClients) > 0 { + if err := s.db.CreateInBatches(dbClients, 100).Error; err != nil { + return err + } + } + + return nil +} + +// saveEnvKeysToDB saves environment variable tracking to database +func (s *ConfigStore) saveEnvKeysToDB() error { + // Delete existing env keys + if err := s.db.Session(&gorm.Session{AllowGlobalUpdate: true}).Delete(&DBEnvKey{}).Error; err != nil { + return err + } + + var dbEnvKeys []DBEnvKey + for envVar, infos := range s.EnvKeys { + for _, info := range infos { + dbEnvKey := DBEnvKey{ + EnvVar: envVar, + Provider: info.Provider, + KeyType: info.KeyType, + ConfigPath: info.ConfigPath, + KeyID: info.KeyID, + } + + dbEnvKeys = append(dbEnvKeys, dbEnvKey) + } + } + + if len(dbEnvKeys) > 0 { + if err := s.db.CreateInBatches(dbEnvKeys, 100).Error; err != nil { + return err + } + } + + return nil } // parseMetaConfig converts raw JSON to the appropriate provider-specific meta config interface @@ -1585,3 +1928,196 @@ func (s *ConfigStore) processVertexKeyConfigEnvVars(key *schemas.Key, provider s return nil } + +// LoadConfiguration implements the hybrid file-database configuration loading approach. +// It checks for a config.json file on startup and compares its hash with the stored hash in the database. +// If the hash matches, it loads from the database (fast path). +// If the hash differs or no previous hash exists, it loads from the file and updates the database. +// +// Flow: +// 1. Check if config.json exists in app directory +// 2. If exists: Calculate hash and compare with DB hash +// - Hash matches: Load from DB (fast path) +// - Hash differs: Load from file โ†’ Update DB โ†’ Store new hash +// +// 3. If not exists: Load from DB only (current behavior) +func (s *ConfigStore) LoadConfiguration() error { + s.mu.Lock() + defer s.mu.Unlock() + + s.logger.Info(fmt.Sprintf("Checking for configuration file: %s", s.configPath)) + + // Check if config file exists + if _, err := os.Stat(s.configPath); err == nil { + // File exists - implement hash-based loading + return s.loadWithFileCheck(s.configPath) + } else { + // No file - load from DB only + s.logger.Info("No config.json file found, loading from database only") + return s.loadFromDatabaseInternal() + } +} + +func (s *ConfigStore) loadDefaultConfig() error { + s.logger.Info(fmt.Sprintf("Config file %s not found, starting with default configuration. Providers can be added dynamically via UI.", s.configPath)) + + // Initialize with default configuration + s.ClientConfig = DefaultClientConfig + s.Providers = make(map[schemas.ModelProvider]ProviderConfig) + s.MCPConfig = nil + + // Auto-detect and configure providers from common environment variables + s.autoDetectProviders() + + return s.db.Transaction(func(tx *gorm.DB) error { + // Temporarily swap database for transaction + oldDB := s.db + s.db = tx + defer func() { s.db = oldDB }() + + //update database with default config + if err := s.SaveConfig(); err != nil { + return fmt.Errorf("failed to sync to database: %w", err) + } + + if err := s.writeConfigToFile(s.configPath); err != nil { + return fmt.Errorf("failed to write config to file: %w", err) + } + + hash, err := s.calculateFileHash(s.configPath) + if err != nil { + return fmt.Errorf("failed to calculate file hash: %w", err) + } + + if err := s.storeConfigHash(tx, hash); err != nil { + return err + } + + s.logger.Info("Successfully initialized with default configuration.") + return nil + + }) +} + +// loadWithFileCheck implements the hash comparison and loading logic +func (s *ConfigStore) loadWithFileCheck(configFile string) error { + // 1. Calculate current file hash + currentHash, err := s.calculateFileHash(configFile) + if err != nil { + return fmt.Errorf("failed to calculate file hash: %w", err) + } + + // 2. Get latest stored hash from database + var latestHash DBConfigHash + err = s.db.Order("updated_at DESC").First(&latestHash).Error + if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { + return fmt.Errorf("failed to get latest hash from database: %w", err) + } + + // 3. Compare hashes + if err == nil && latestHash.Hash == currentHash { + // Hash matches - load from DB (fast path) + s.logger.Info("Config file unchanged, loading from database") + return s.loadFromDatabaseInternal() + } else { + // Hash differs or no previous hash - load from file + s.logger.Info("Config file changed or no previous hash found, loading from file and updating database") + return s.loadFromFileAndUpdateDB(configFile, currentHash) + } +} + +// calculateFileHash calculates SHA256 hash of the config file +func (s *ConfigStore) calculateFileHash(filePath string) (string, error) { + data, err := os.ReadFile(filePath) + if err != nil { + return "", fmt.Errorf("failed to read file: %w", err) + } + + hash := sha256.Sum256(data) + return hex.EncodeToString(hash[:]), nil +} + +// loadFromFileAndUpdateDB loads configuration from file and updates the database +func (s *ConfigStore) loadFromFileAndUpdateDB(configFile, hash string) error { + // 1. Load config from file using existing LoadFromConfig method + if err := s.LoadFromConfig(configFile); err != nil { + return fmt.Errorf("failed to load from file: %w", err) + } + return s.db.Transaction(func(tx *gorm.DB) error { + // Temporarily swap database for transaction + oldDB := s.db + s.db = tx + defer func() { s.db = oldDB }() + + // 2. Update database with file data + if err := s.SaveConfig(); err != nil { + return fmt.Errorf("failed to sync to database: %w", err) + } + + if err := s.storeConfigHash(tx, hash); err != nil { + return err + } + + s.logger.Info(fmt.Sprintf("Successfully loaded configuration from file and updated database with hash: %s", hash[:8])) + return nil + }) +} + +// loadFromDatabaseInternal is the internal version of LoadFromDatabase without locking +// (since LoadConfiguration already holds the lock) +func (s *ConfigStore) loadFromDatabaseInternal() error { + s.logger.Info("Loading configuration from database") + + // Load client configuration + if err := s.loadClientConfigFromDB(); err != nil { + s.logger.Warn(fmt.Sprintf("Failed to load client config from database, using defaults: %v", err)) + s.ClientConfig = DefaultClientConfig + } + + // Load providers configuration + if err := s.loadProvidersFromDB(); err != nil { + s.logger.Warn(fmt.Sprintf("Failed to load providers from database: %v", err)) + // Auto-detect providers if database load fails + s.autoDetectProviders() + } + + // Load MCP configuration + if err := s.loadMCPFromDB(); err != nil { + s.logger.Warn(fmt.Sprintf("Failed to load MCP config from database: %v", err)) + s.MCPConfig = nil + } + + // Load environment variable tracking + if err := s.loadEnvKeysFromDB(); err != nil { + s.logger.Warn(fmt.Sprintf("Failed to load env keys from database: %v", err)) + s.EnvKeys = make(map[string][]EnvKeyInfo) + } + + s.logger.Info("Successfully loaded configuration from database.") + return nil +} + +func (s *ConfigStore) storeConfigHash(tx *gorm.DB, hash string) error { + var existingHash DBConfigHash + if err := tx.Where("hash = ?", hash).First(&existingHash).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // Hash doesn't exist, create new record + newHash := DBConfigHash{ + Hash: hash, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + if err := tx.Create(&newHash).Error; err != nil { + return fmt.Errorf("failed to store hash in database: %w", err) + } + } else { + return fmt.Errorf("failed to check existing hash: %w", err) + } + } else { + // Hash exists, update the UpdatedAt field + if err := tx.Model(&existingHash).Update("updated_at", time.Now()).Error; err != nil { + return fmt.Errorf("failed to update hash record: %w", err) + } + } + return nil +} diff --git a/transports/bifrost-http/main.go b/transports/bifrost-http/main.go index f927ebd8b6..315f9aa601 100644 --- a/transports/bifrost-http/main.go +++ b/transports/bifrost-http/main.go @@ -72,6 +72,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/valyala/fasthttp" "github.com/valyala/fasthttp/fasthttpadaptor" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + gormLogger "gorm.io/gorm/logger" ) //go:embed all:ui @@ -93,7 +96,7 @@ func init() { pluginString := "" flag.StringVar(&port, "port", "8080", "Port to run the server on") - flag.StringVar(&appDir, "app-dir", ".", "Application data directory (contains config.json and logs)") + flag.StringVar(&appDir, "app-dir", "./bifrost-data", "Application data directory (contains config.json and logs)") flag.StringVar(&pluginString, "plugins", "", "Comma separated list of plugins to load") flag.Parse() @@ -238,29 +241,65 @@ func main() { log.Fatalf("failed to create app directory %s: %v", appDir, err) } - // Derive paths from app-dir - configPath := filepath.Join(appDir, "config.json") - logDir := filepath.Join(appDir, "logs") - // Register Prometheus collectors registerCollectorSafely(collectors.NewGoCollector()) registerCollectorSafely(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) logger := bifrost.NewDefaultLogger(schemas.LogLevelInfo) - // Initialize high-performance configuration store with caching - store, err := lib.NewConfigStore(logger) + // Initialize separate database connections for optimal performance at scale + configDBPath := filepath.Join(appDir, "config.db") + configFilePath := filepath.Join(appDir, "config.json") + logsDBPath := filepath.Join(appDir, "logs.db") + + // Config database: Optimized for fast reads, rare writes + configDB, err := gorm.Open(sqlite.Open(configDBPath+"?_journal_mode=WAL&_synchronous=NORMAL&_cache_size=1000"), &gorm.Config{ + Logger: gormLogger.Default.LogMode(gormLogger.Silent), + }) + if err != nil { + log.Fatalf("failed to initialize config database: %v", err) + } + + // Configure config database for read-heavy workload + configSQLDB, err := configDB.DB() + if err != nil { + log.Fatalf("failed to get config database: %v", err) + } + configSQLDB.SetMaxIdleConns(2) // Minimal connections for config + configSQLDB.SetMaxOpenConns(5) // Config doesn't need many connections + + // Initialize high-performance configuration store with dedicated database + store, err := lib.NewConfigStore(logger, configDB, configFilePath) if err != nil { log.Fatalf("failed to initialize config store: %v", err) } - // Load configuration from JSON file into the store with full preprocessing - // This processes environment variables and stores all configurations in memory for ultra-fast access - if err := store.LoadFromConfig(configPath); err != nil { + // Load configuration using hybrid file-database approach + // This checks for config.json file, compares hash with database, and loads accordingly + if err := store.LoadConfiguration(); err != nil { log.Fatalf("failed to load config: %v", err) } - // Create account backed by the high-performance store (all processing is done in LoadFromConfig) + // Logs database: Optimized for high-volume writes + var logsDB *gorm.DB + if store.ClientConfig.EnableLogging { + logsDB, err = gorm.Open(sqlite.Open(logsDBPath+"?_journal_mode=WAL&_synchronous=NORMAL&_cache_size=2000&_busy_timeout=30000"), &gorm.Config{ + Logger: gormLogger.Default.LogMode(gormLogger.Silent), + }) + if err != nil { + log.Fatalf("failed to initialize logs database: %v", err) + } + + // Configure logs database for write-heavy workload at scale + logsSQLDB, err := logsDB.DB() + if err != nil { + log.Fatalf("failed to get logs database: %v", err) + } + logsSQLDB.SetMaxIdleConns(20) // Higher for concurrent writes + logsSQLDB.SetMaxOpenConns(50) // Support high concurrent logging + } + + // Create account backed by the high-performance store (all processing is done in LoadFromDatabase) // The account interface now benefits from ultra-fast config access times via in-memory storage account := lib.NewBaseAccount(store) @@ -297,14 +336,9 @@ func main() { var loggingHandler *handlers.LoggingHandler var wsHandler *handlers.WebSocketHandler - if store.ClientConfig.EnableLogging { - // Initialize logging plugin with app-dir based path - loggingConfig := &logging.Config{ - DatabasePath: logDir, - } - - var err error - loggingPlugin, err = logging.NewLoggerPlugin(loggingConfig, logger) + if store.ClientConfig.EnableLogging && logsDB != nil { + // Use dedicated logs database with high-scale optimizations + loggingPlugin, err = logging.NewLoggerPlugin(logsDB, logger) if err != nil { log.Fatalf("failed to initialize logging plugin: %v", err) } @@ -334,7 +368,7 @@ func main() { completionHandler := handlers.NewCompletionHandler(client, logger) mcpHandler := handlers.NewMCPHandler(client, logger, store) integrationHandler := handlers.NewIntegrationHandler(client) - configHandler := handlers.NewConfigHandler(client, logger, store, configPath) + configHandler := handlers.NewConfigHandler(client, logger, store) // Set up WebSocket callback for real-time log updates if wsHandler != nil && loggingPlugin != nil { diff --git a/transports/bifrost-http/plugins/logging/main.go b/transports/bifrost-http/plugins/logging/main.go index 0ce7381039..cb529ff7af 100644 --- a/transports/bifrost-http/plugins/logging/main.go +++ b/transports/bifrost-http/plugins/logging/main.go @@ -1,21 +1,17 @@ -// Package logging provides a SQLite-based logging plugin for Bifrost. +// Package logging provides a GORM-based logging plugin for Bifrost. // This plugin stores comprehensive logs of all requests and responses with search, // filter, and pagination capabilities. package logging import ( "context" - "database/sql" "fmt" - "os" - "path/filepath" - "strings" "sync" "sync/atomic" "time" - _ "github.com/mattn/go-sqlite3" "github.com/maximhq/bifrost/core/schemas" + "gorm.io/gorm" ) const ( @@ -37,6 +33,7 @@ const ( // Context keys for logging optimization const ( DroppedCreateContextKey ContextKey = "bifrost-logging-dropped" + CreatedTimestampKey ContextKey = "bifrost-logging-created-timestamp" ) // UpdateLogData contains data for log entry updates @@ -85,30 +82,6 @@ type InitialLogData struct { Tools *[]schemas.Tool } -// LogEntry represents a complete log entry for a request/response cycle -type LogEntry struct { - ID string `json:"id"` - Timestamp time.Time `json:"timestamp"` - Object string `json:"object"` // text.completion, chat.completion, embedding, audio.speech, or audio.transcription - Provider string `json:"provider"` - Model string `json:"model"` - InputHistory []schemas.BifrostMessage `json:"input_history,omitempty"` - OutputMessage *schemas.BifrostMessage `json:"output_message,omitempty"` - Params *schemas.ModelParameters `json:"params,omitempty"` - SpeechInput *schemas.SpeechInput `json:"speech_input,omitempty"` - TranscriptionInput *schemas.TranscriptionInput `json:"transcription_input,omitempty"` - SpeechOutput *schemas.BifrostSpeech `json:"speech_output,omitempty"` - TranscriptionOutput *schemas.BifrostTranscribe `json:"transcription_output,omitempty"` - Tools *[]schemas.Tool `json:"tools,omitempty"` - ToolCalls *[]schemas.ToolCall `json:"tool_calls,omitempty"` - Latency *float64 `json:"latency,omitempty"` - TokenUsage *schemas.LLMUsage `json:"token_usage,omitempty"` - Status string `json:"status"` // "processing", "success", or "error" - ErrorDetails *schemas.BifrostError `json:"error_details,omitempty"` - Stream bool `json:"stream"` // true if this was a streaming response - CreatedAt time.Time `json:"created_at"` -} - // SearchFilters represents the available filters for log searches type SearchFilters struct { Providers []string `json:"providers,omitempty"` @@ -136,18 +109,14 @@ type PaginationOptions struct { type SearchResult struct { Logs []LogEntry `json:"logs"` Pagination PaginationOptions `json:"pagination"` - Stats struct { - TotalRequests int64 `json:"total_requests"` - SuccessRate float64 `json:"success_rate"` // Percentage of successful requests - AverageLatency float64 `json:"average_latency"` // Average latency in milliseconds - TotalTokens int64 `json:"total_tokens"` // Total tokens used - } `json:"stats"` + Stats SearchStats `json:"stats"` } -// Config represents the configuration for the logging plugin -type Config struct { - DatabasePath string `json:"database_path"` - // SQLite memory optimization is now handled via connection string parameters +type SearchStats struct { + TotalRequests int64 `json:"total_requests"` + SuccessRate float64 `json:"success_rate"` // Percentage of successful requests + AverageLatency float64 `json:"average_latency"` // Average latency in milliseconds + TotalTokens int64 `json:"total_tokens"` // Total tokens used } // LogCallback is a function that gets called when a new log entry is created @@ -155,8 +124,7 @@ type LogCallback func(*LogEntry) // LoggerPlugin implements the schemas.Plugin interface type LoggerPlugin struct { - config *Config - db *sql.DB + db *gorm.DB mu sync.Mutex done chan struct{} wg sync.WaitGroup @@ -169,40 +137,13 @@ type LoggerPlugin struct { streamDataPool sync.Pool // Pool for reusing StreamUpdateData structs } -// NewLoggerPlugin creates a new logging plugin -func NewLoggerPlugin(config *Config, logger schemas.Logger) (*LoggerPlugin, error) { - if config == nil { - config = &Config{ - DatabasePath: "./bifrost-logs.db", - } - } - - // Handle legacy database path (if it was a directory for BadgerDB) - dbPath := config.DatabasePath - if !strings.HasSuffix(dbPath, ".db") { - dbPath = filepath.Join(dbPath, "logs.db") - } - - // Ensure the directory exists - dbDir := filepath.Dir(dbPath) - if err := os.MkdirAll(dbDir, 0755); err != nil { - return nil, fmt.Errorf("failed to create database directory %s: %w", dbDir, err) - } - - // Open SQLite with optimized settings for low memory usage - db, err := sql.Open("sqlite3", dbPath+"?cache=shared&_journal_mode=WAL&_synchronous=NORMAL&_auto_vacuum=incremental&_page_size=4096&_temp_store=FILE&_mmap_size=0") - if err != nil { - return nil, fmt.Errorf("failed to open SQLite database at %s: %w", dbPath, err) - } - - // Test the connection - if err := db.Ping(); err != nil { - db.Close() - return nil, fmt.Errorf("failed to ping SQLite database: %w", err) +// NewLoggerPlugin creates a new logging plugin with GORM database +func NewLoggerPlugin(db *gorm.DB, logger schemas.Logger) (*LoggerPlugin, error) { + if db == nil { + return nil, fmt.Errorf("GORM database connection cannot be nil") } plugin := &LoggerPlugin{ - config: config, db: db, done: make(chan struct{}), logger: logger, @@ -230,10 +171,9 @@ func NewLoggerPlugin(config *Config, logger schemas.Logger) (*LoggerPlugin, erro plugin.streamDataPool.Put(&StreamUpdateData{}) } - // Create tables and indexes - if err := plugin.createTables(); err != nil { - db.Close() - return nil, fmt.Errorf("failed to create tables: %w", err) + // Auto-migrate tables + if err := plugin.autoMigrate(); err != nil { + return nil, fmt.Errorf("failed to auto-migrate tables: %w", err) } // Start cleanup ticker (runs every 30 seconds) @@ -244,136 +184,11 @@ func NewLoggerPlugin(config *Config, logger schemas.Logger) (*LoggerPlugin, erro return plugin, nil } -// createTables creates the SQLite tables and indexes -func (p *LoggerPlugin) createTables() error { - // Main logs table with updated schema - createTable := ` - CREATE TABLE IF NOT EXISTS logs ( - id TEXT PRIMARY KEY, - timestamp INTEGER, - provider TEXT NOT NULL, - model TEXT NOT NULL, - object_type TEXT NOT NULL, - status TEXT NOT NULL, - latency REAL, - prompt_tokens INTEGER, - completion_tokens INTEGER, - total_tokens INTEGER, - - -- Store complex fields as JSON - input_history TEXT, - output_message TEXT, - tools TEXT, - tool_calls TEXT, - params TEXT, - error_details TEXT, - speech_input TEXT, - transcription_input TEXT, - speech_output TEXT, - transcription_output TEXT, - - -- For content search - content_summary TEXT, - - -- Stream indicator - stream BOOLEAN DEFAULT FALSE, - - -- Timestamps for tracking - created_at INTEGER NOT NULL - )` - - if _, err := p.db.Exec(createTable); err != nil { - return fmt.Errorf("failed to create logs table: %w", err) - } - - // Check if we need to add the new columns to existing table - if err := p.migrateTableSchema(); err != nil { - return fmt.Errorf("failed to migrate table schema: %w", err) - } - - // Create indexes for fast filtering - indexes := []string{ - "CREATE INDEX IF NOT EXISTS idx_timestamp ON logs(timestamp)", - "CREATE INDEX IF NOT EXISTS idx_provider ON logs(provider)", - "CREATE INDEX IF NOT EXISTS idx_model ON logs(model)", - "CREATE INDEX IF NOT EXISTS idx_object_type ON logs(object_type)", - "CREATE INDEX IF NOT EXISTS idx_status ON logs(status)", - "CREATE INDEX IF NOT EXISTS idx_created_at ON logs(created_at)", - } - - for _, index := range indexes { - if _, err := p.db.Exec(index); err != nil { - return fmt.Errorf("failed to create index: %w", err) - } - } - - // Check if FTS5 is available - var ftsAvailable bool - err := p.db.QueryRow("SELECT 1 FROM pragma_compile_options WHERE compile_options = 'ENABLE_FTS5'").Scan(&ftsAvailable) - if err != nil { - p.logger.Debug("FTS5 not available for logging, falling back to regular search") - } else { - createFTS := ` - CREATE VIRTUAL TABLE IF NOT EXISTS logs_fts USING fts5( - id, content_summary, content='logs', content_rowid='rowid' - )` - - if _, err := p.db.Exec(createFTS); err != nil { - p.logger.Warn(fmt.Sprintf("Failed to create FTS table, falling back to LIKE search: %v", err)) - } else { - // Create triggers to keep FTS table in sync - triggers := []string{ - `CREATE TRIGGER IF NOT EXISTS logs_fts_insert AFTER INSERT ON logs BEGIN - INSERT INTO logs_fts(id, content_summary) VALUES (new.id, new.content_summary); - END`, - `CREATE TRIGGER IF NOT EXISTS logs_fts_update AFTER UPDATE ON logs BEGIN - UPDATE logs_fts SET content_summary = new.content_summary WHERE id = new.id; - END`, - `CREATE TRIGGER IF NOT EXISTS logs_fts_delete AFTER DELETE ON logs BEGIN - DELETE FROM logs_fts WHERE id = old.id; - END`, - } - - for _, trigger := range triggers { - if _, err := p.db.Exec(trigger); err != nil { - p.logger.Warn(fmt.Sprintf("Failed to create FTS trigger: %v", err)) - } - } - } - } - - return nil -} - -// migrateTableSchema adds new columns if they don't exist -func (p *LoggerPlugin) migrateTableSchema() error { - // List of columns to check and add - columnsToAdd := []struct { - name string - definition string - defaultValue string - }{ - {"created_at", "INTEGER", "0"}, - {"stream", "BOOLEAN", "FALSE"}, - {"speech_input", "TEXT", "NULL"}, - {"transcription_input", "TEXT", "NULL"}, - {"speech_output", "TEXT", "NULL"}, - {"transcription_output", "TEXT", "NULL"}, - } - - for _, column := range columnsToAdd { - var columnExists bool - err := p.db.QueryRow("SELECT COUNT(*) FROM pragma_table_info('logs') WHERE name = ?", column.name).Scan(&columnExists) - if err != nil { - return fmt.Errorf("failed to check for %s column: %w", column.name, err) - } - - if !columnExists { - query := fmt.Sprintf("ALTER TABLE logs ADD COLUMN %s %s DEFAULT %s", column.name, column.definition, column.defaultValue) - if _, err := p.db.Exec(query); err != nil { - return fmt.Errorf("failed to add %s column: %w", column.name, err) - } - } +// autoMigrate creates/updates the database tables using GORM +func (p *LoggerPlugin) autoMigrate() error { + // First migrate the main table + if err := p.db.AutoMigrate(&LogEntry{}); err != nil { + return err } return nil @@ -394,22 +209,21 @@ func (p *LoggerPlugin) cleanupWorker() { } } -// cleanupOldProcessingLogs removes processing logs older than 1 minute +// cleanupOldProcessingLogs removes processing logs older than 5 minutes func (p *LoggerPlugin) cleanupOldProcessingLogs() { - // Calculate timestamp for 1 minute ago - oneMinuteAgo := time.Now().Add(-1 * time.Minute).UnixNano() - - // Delete processing logs older than 1 minute - query := `DELETE FROM logs WHERE status = 'processing' AND created_at < ?` - result, err := p.db.Exec(query, oneMinuteAgo) - if err != nil { - p.logger.Error(fmt.Errorf("failed to cleanup old processing logs: %w", err)) + // Calculate timestamp for 5 minutes ago + fiveMinutesAgo := time.Now().Add(-1 * 5 * time.Minute) + + // Delete processing logs older than 5 minutes using GORM + result := p.db.Where("status = ? AND created_at < ?", "processing", fiveMinutesAgo).Delete(&LogEntry{}) + if result.Error != nil { + p.logger.Error(fmt.Errorf("failed to cleanup old processing logs: %w", result.Error)) return } // Log the cleanup activity - if rowsAffected, err := result.RowsAffected(); err == nil && rowsAffected > 0 { - p.logger.Debug(fmt.Sprintf("Cleaned up %d old processing logs", rowsAffected)) + if result.RowsAffected > 0 { + p.logger.Debug(fmt.Sprintf("Cleaned up %d old processing logs", result.RowsAffected)) } } @@ -520,11 +334,15 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest initialData.Tools = req.Params.Tools } + // Store created timestamp in context for latency calculation optimization + createdTimestamp := time.Now() + *ctx = context.WithValue(*ctx, CreatedTimestampKey, createdTimestamp) + // Queue the log creation message (non-blocking) - Using sync.Pool logMsg := p.getLogMessage() logMsg.Operation = LogOperationCreate logMsg.RequestID = requestID - logMsg.Timestamp = time.Now() + logMsg.Timestamp = createdTimestamp logMsg.InitialData = initialData go func(logMsg *LogMessage) { @@ -542,11 +360,9 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest Object: logMsg.InitialData.Object, Provider: logMsg.InitialData.Provider, Model: logMsg.InitialData.Model, - InputHistory: logMsg.InitialData.InputHistory, - Params: logMsg.InitialData.Params, - SpeechInput: logMsg.InitialData.SpeechInput, - TranscriptionInput: logMsg.InitialData.TranscriptionInput, - Tools: logMsg.InitialData.Tools, + InputHistoryParsed: logMsg.InitialData.InputHistory, + ParamsParsed: logMsg.InitialData.Params, + ToolsParsed: logMsg.InitialData.Tools, Status: "processing", Stream: false, // Initially false, will be updated if streaming CreatedAt: logMsg.Timestamp, @@ -590,7 +406,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes logMsg.RequestID = requestID logMsg.Timestamp = time.Now() - if isStreaming { + if isStreaming { // NOTE: in case where stream ends with error, isStreaming is false because there is no way to know that the error is from the stream. // Handle streaming response with lightweight async pattern logMsg.Operation = LogOperationStreamUpdate @@ -685,12 +501,17 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes // Output message and tool calls if len(result.Choices) > 0 { - updateData.OutputMessage = &result.Choices[0].Message + choice := result.Choices[0] + + // Check if this is a non-stream response choice + if choice.BifrostNonStreamResponseChoice != nil { + updateData.OutputMessage = &choice.BifrostNonStreamResponseChoice.Message - // Extract tool calls if present - if result.Choices[0].Message.AssistantMessage != nil && - result.Choices[0].Message.AssistantMessage.ToolCalls != nil { - updateData.ToolCalls = result.Choices[0].Message.AssistantMessage.ToolCalls + // Extract tool calls if present + if choice.BifrostNonStreamResponseChoice.Message.AssistantMessage != nil && + choice.BifrostNonStreamResponseChoice.Message.AssistantMessage.ToolCalls != nil { + updateData.ToolCalls = choice.BifrostNonStreamResponseChoice.Message.AssistantMessage.ToolCalls + } } } @@ -729,13 +550,26 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes logMsg.UpdateData = updateData } - isFinalChunk := logMsg.StreamUpdateData != nil && - (logMsg.StreamUpdateData.FinishReason != nil || - (result.Speech != nil && result.Speech.BifrostSpeechStreamResponse != nil && result.Speech.Usage != nil) || - (result.Transcribe != nil && result.Transcribe.BifrostTranscribeStreamResponse != nil && result.Transcribe.Usage != nil)) + // Calculate isFinalChunks + var isFinalChunk bool + if logMsg.StreamUpdateData != nil { + isFinalChunk = logMsg.StreamUpdateData.FinishReason != nil + + // Check speech streaming completion + if !isFinalChunk && result != nil && result.Speech != nil && + result.Speech.BifrostSpeechStreamResponse != nil && result.Speech.Usage != nil { + isFinalChunk = true + } + + // Check transcription streaming completion + if !isFinalChunk && result != nil && result.Transcribe != nil && + result.Transcribe.BifrostTranscribeStreamResponse != nil && result.Transcribe.Usage != nil { + isFinalChunk = true + } + } // Both streaming and regular updates now use the same async pattern - go func(logMsg *LogMessage, isFinalChunk bool) { + go func(logMsg *LogMessage, isFinalChunk bool, ctx context.Context) { defer p.putLogMessage(logMsg) // Return to pool when done // Return pooled data structures to their respective pools @@ -750,9 +584,9 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes var processingErr error if logMsg.Operation == LogOperationStreamUpdate { - processingErr = p.processStreamUpdate(logMsg.RequestID, logMsg.Timestamp, logMsg.StreamUpdateData, isFinalChunk) + processingErr = p.processStreamUpdate(logMsg.RequestID, logMsg.Timestamp, logMsg.StreamUpdateData, isFinalChunk, ctx) } else { - processingErr = p.updateLogEntry(logMsg.RequestID, logMsg.Timestamp, logMsg.UpdateData) + processingErr = p.updateLogEntry(logMsg.RequestID, logMsg.Timestamp, logMsg.UpdateData, ctx) } if processingErr != nil { @@ -768,7 +602,7 @@ func (p *LoggerPlugin) PostHook(ctx *context.Context, result *schemas.BifrostRes } p.mu.Unlock() } - }(logMsg, isFinalChunk) + }(logMsg, isFinalChunk, *ctx) return result, err, nil } @@ -814,9 +648,47 @@ func (p *LoggerPlugin) Cleanup() error { // Wait for the background worker to finish processing remaining items p.wg.Wait() - // Close the database - if p.db != nil { - return p.db.Close() - } + // GORM handles connection cleanup automatically return nil } + +// Helper methods + +// determineObjectType determines the object type from request input +func (p *LoggerPlugin) determineObjectType(input schemas.RequestInput) string { + if input.ChatCompletionInput != nil { + return "chat.completion" + } + if input.TextCompletionInput != nil { + return "text.completion" + } + if input.EmbeddingInput != nil { + return "embedding" + } + if input.SpeechInput != nil { + return "speech" + } + if input.TranscriptionInput != nil { + return "transcription" + } + return "unknown" +} + +// extractInputHistory extracts input history from request input +func (p *LoggerPlugin) extractInputHistory(input schemas.RequestInput) []schemas.BifrostMessage { + if input.ChatCompletionInput != nil { + return *input.ChatCompletionInput + } + if input.TextCompletionInput != nil { + // Convert text completion to message format + return []schemas.BifrostMessage{ + { + Role: schemas.ModelChatMessageRoleUser, + Content: schemas.MessageContent{ + ContentStr: input.TextCompletionInput, + }, + }, + } + } + return []schemas.BifrostMessage{} +} diff --git a/transports/bifrost-http/plugins/logging/models.go b/transports/bifrost-http/plugins/logging/models.go new file mode 100644 index 0000000000..21d0adaca9 --- /dev/null +++ b/transports/bifrost-http/plugins/logging/models.go @@ -0,0 +1,315 @@ +// Package logging provides GORM model definitions and related methods +package logging + +import ( + "encoding/json" + "time" + "strings" + + "github.com/maximhq/bifrost/core/schemas" + "gorm.io/gorm" +) + +// LogEntry represents a complete log entry for a request/response cycle +// This is the GORM model with appropriate tags +type LogEntry struct { + ID string `gorm:"primaryKey;type:varchar(255)" json:"id"` + Timestamp time.Time `gorm:"index;not null" json:"timestamp"` + Object string `gorm:"type:varchar(255);index;not null;column:object_type" json:"object"` // text.completion, chat.completion, or embedding + Provider string `gorm:"type:varchar(255);index;not null" json:"provider"` + Model string `gorm:"type:varchar(255);index;not null" json:"model"` + InputHistory string `gorm:"type:text" json:"-"` // JSON serialized []schemas.BifrostMessage + OutputMessage string `gorm:"type:text" json:"-"` // JSON serialized *schemas.BifrostMessage + Params string `gorm:"type:text" json:"-"` // JSON serialized *schemas.ModelParameters + Tools string `gorm:"type:text" json:"-"` // JSON serialized *[]schemas.Tool + ToolCalls string `gorm:"type:text" json:"-"` // JSON serialized *[]schemas.ToolCall + SpeechInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.SpeechInput + TranscriptionInput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.TranscriptionInput + SpeechOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.BifrostSpeech + TranscriptionOutput string `gorm:"type:text" json:"-"` // JSON serialized *schemas.BifrostTranscribe + Latency *float64 `json:"latency,omitempty"` + TokenUsage string `gorm:"type:text" json:"-"` // JSON serialized *schemas.LLMUsage + Status string `gorm:"type:varchar(50);index;not null" json:"status"` // "processing", "success", or "error" + ErrorDetails string `gorm:"type:text" json:"-"` // JSON serialized *schemas.BifrostError + Stream bool `gorm:"default:false" json:"stream"` // true if this was a streaming response + ContentSummary string `gorm:"type:text" json:"-"` // For content search + + // Denormalized token fields for easier querying + PromptTokens int `gorm:"default:0" json:"-"` + CompletionTokens int `gorm:"default:0" json:"-"` + TotalTokens int `gorm:"default:0" json:"-"` + + CreatedAt time.Time `gorm:"index;not null" json:"created_at"` + + // Virtual fields for JSON output - these will be populated when needed + InputHistoryParsed []schemas.BifrostMessage `gorm:"-" json:"input_history,omitempty"` + OutputMessageParsed *schemas.BifrostMessage `gorm:"-" json:"output_message,omitempty"` + ParamsParsed *schemas.ModelParameters `gorm:"-" json:"params,omitempty"` + ToolsParsed *[]schemas.Tool `gorm:"-" json:"tools,omitempty"` + ToolCallsParsed *[]schemas.ToolCall `gorm:"-" json:"tool_calls,omitempty"` + TokenUsageParsed *schemas.LLMUsage `gorm:"-" json:"token_usage,omitempty"` + ErrorDetailsParsed *schemas.BifrostError `gorm:"-" json:"error_details,omitempty"` + SpeechInputParsed *schemas.SpeechInput `gorm:"-" json:"speech_input,omitempty"` + TranscriptionInputParsed *schemas.TranscriptionInput `gorm:"-" json:"transcription_input,omitempty"` + SpeechOutputParsed *schemas.BifrostSpeech `gorm:"-" json:"speech_output,omitempty"` + TranscriptionOutputParsed *schemas.BifrostTranscribe `gorm:"-" json:"transcription_output,omitempty"` +} + +// TableName sets the table name for GORM +func (LogEntry) TableName() string { + return "logs" +} + +// BeforeCreate GORM hook to set created_at and serialize JSON fields +func (l *LogEntry) BeforeCreate(tx *gorm.DB) error { + if l.CreatedAt.IsZero() { + l.CreatedAt = time.Now() + } + return l.serializeFields() +} + +// BeforeSave GORM hook to serialize JSON fields +func (l *LogEntry) BeforeSave(tx *gorm.DB) error { + return l.serializeFields() +} + +// AfterFind GORM hook to deserialize JSON fields +func (l *LogEntry) AfterFind(tx *gorm.DB) error { + return l.deserializeFields() +} + +// serializeFields converts Go structs to JSON strings for storage +func (l *LogEntry) serializeFields() error { + if l.InputHistoryParsed != nil { + if data, err := json.Marshal(l.InputHistoryParsed); err != nil { + return err + } else { + l.InputHistory = string(data) + } + } + + if l.OutputMessageParsed != nil { + if data, err := json.Marshal(l.OutputMessageParsed); err != nil { + return err + } else { + l.OutputMessage = string(data) + } + } + + if l.SpeechInputParsed != nil { + if data, err := json.Marshal(l.SpeechInputParsed); err != nil { + return err + } else { + l.SpeechInput = string(data) + } + } + + if l.TranscriptionInputParsed != nil { + if data, err := json.Marshal(l.TranscriptionInputParsed); err != nil { + return err + } else { + l.TranscriptionInput = string(data) + } + } + + if l.SpeechOutputParsed != nil { + if data, err := json.Marshal(l.SpeechOutputParsed); err != nil { + return err + } else { + l.SpeechOutput = string(data) + } + } + + if l.TranscriptionOutputParsed != nil { + if data, err := json.Marshal(l.TranscriptionOutputParsed); err != nil { + return err + } else { + l.TranscriptionOutput = string(data) + } + } + + if l.ParamsParsed != nil { + if data, err := json.Marshal(l.ParamsParsed); err != nil { + return err + } else { + l.Params = string(data) + } + } + + if l.ToolsParsed != nil { + if data, err := json.Marshal(l.ToolsParsed); err != nil { + return err + } else { + l.Tools = string(data) + } + } + + if l.ToolCallsParsed != nil { + if data, err := json.Marshal(l.ToolCallsParsed); err != nil { + return err + } else { + l.ToolCalls = string(data) + } + } + + if l.TokenUsageParsed != nil { + if data, err := json.Marshal(l.TokenUsageParsed); err != nil { + return err + } else { + l.TokenUsage = string(data) + } + // Update denormalized fields for easier querying + l.PromptTokens = l.TokenUsageParsed.PromptTokens + l.CompletionTokens = l.TokenUsageParsed.CompletionTokens + l.TotalTokens = l.TokenUsageParsed.TotalTokens + } + + if l.ErrorDetailsParsed != nil { + if data, err := json.Marshal(l.ErrorDetailsParsed); err != nil { + return err + } else { + l.ErrorDetails = string(data) + } + } + + // Build content summary for search + l.ContentSummary = l.buildContentSummary() + + return nil +} + +// deserializeFields converts JSON strings back to Go structs +func (l *LogEntry) deserializeFields() error { + if l.InputHistory != "" { + if err := json.Unmarshal([]byte(l.InputHistory), &l.InputHistoryParsed); err != nil { + // Log error but don't fail the operation - initialize as empty slice + l.InputHistoryParsed = []schemas.BifrostMessage{} + } + } + + if l.OutputMessage != "" { + if err := json.Unmarshal([]byte(l.OutputMessage), &l.OutputMessageParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.OutputMessageParsed = nil + } + } + + if l.Params != "" { + if err := json.Unmarshal([]byte(l.Params), &l.ParamsParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.ParamsParsed = nil + } + } + + if l.Tools != "" { + if err := json.Unmarshal([]byte(l.Tools), &l.ToolsParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.ToolsParsed = nil + } + } + + if l.ToolCalls != "" { + if err := json.Unmarshal([]byte(l.ToolCalls), &l.ToolCallsParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.ToolCallsParsed = nil + } + } + + if l.TokenUsage != "" { + if err := json.Unmarshal([]byte(l.TokenUsage), &l.TokenUsageParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.TokenUsageParsed = nil + } + } + + if l.ErrorDetails != "" { + if err := json.Unmarshal([]byte(l.ErrorDetails), &l.ErrorDetailsParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.ErrorDetailsParsed = nil + } + } + + // Deserialize speech and transcription fields + if l.SpeechInput != "" { + if err := json.Unmarshal([]byte(l.SpeechInput), &l.SpeechInputParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.SpeechInputParsed = nil + } + } + + if l.TranscriptionInput != "" { + if err := json.Unmarshal([]byte(l.TranscriptionInput), &l.TranscriptionInputParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.TranscriptionInputParsed = nil + } + } + + if l.SpeechOutput != "" { + if err := json.Unmarshal([]byte(l.SpeechOutput), &l.SpeechOutputParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.SpeechOutputParsed = nil + } + } + + if l.TranscriptionOutput != "" { + if err := json.Unmarshal([]byte(l.TranscriptionOutput), &l.TranscriptionOutputParsed); err != nil { + // Log error but don't fail the operation - initialize as nil + l.TranscriptionOutputParsed = nil + } + } + + return nil +} + +// buildContentSummary creates a searchable text summary +func (l *LogEntry) buildContentSummary() string { + var parts []string + + // Add input messages + for _, msg := range l.InputHistoryParsed { + // Access content through the Content field + if msg.Content.ContentStr != nil && *msg.Content.ContentStr != "" { + parts = append(parts, *msg.Content.ContentStr) + } + // If content blocks exist, extract text from them + if msg.Content.ContentBlocks != nil { + for _, block := range *msg.Content.ContentBlocks { + if block.Text != nil && *block.Text != "" { + parts = append(parts, *block.Text) + } + } + } + } + + // Add output message + if l.OutputMessageParsed != nil { + if l.OutputMessageParsed.Content.ContentStr != nil && *l.OutputMessageParsed.Content.ContentStr != "" { + parts = append(parts, *l.OutputMessageParsed.Content.ContentStr) + } + // If content blocks exist, extract text from them + if l.OutputMessageParsed.Content.ContentBlocks != nil { + for _, block := range *l.OutputMessageParsed.Content.ContentBlocks { + if block.Text != nil && *block.Text != "" { + parts = append(parts, *block.Text) + } + } + } + } + + // Add speech input content + if l.SpeechInputParsed != nil && l.SpeechInputParsed.Input != "" { + parts = append(parts, l.SpeechInputParsed.Input) + } + + // Add transcription output content + if l.TranscriptionOutputParsed != nil && l.TranscriptionOutputParsed.Text != "" { + parts = append(parts, l.TranscriptionOutputParsed.Text) + } + + // Add error details + if l.ErrorDetailsParsed != nil && l.ErrorDetailsParsed.Error.Message != "" { + parts = append(parts, l.ErrorDetailsParsed.Error.Message) + } + + return strings.Join(parts, " ") +} diff --git a/transports/bifrost-http/plugins/logging/operations.go b/transports/bifrost-http/plugins/logging/operations.go new file mode 100644 index 0000000000..55c5bd8928 --- /dev/null +++ b/transports/bifrost-http/plugins/logging/operations.go @@ -0,0 +1,476 @@ +// Package logging provides database operations for the GORM-based logging plugin +package logging + +import ( + "context" + "fmt" + "time" + + "database/sql" + + "gorm.io/gorm" + + "github.com/maximhq/bifrost/core/schemas" +) + +// insertInitialLogEntry creates a new log entry in the database using GORM +func (p *LoggerPlugin) insertInitialLogEntry(requestID string, timestamp time.Time, data *InitialLogData) error { + entry := &LogEntry{ + ID: requestID, + Timestamp: timestamp, + Object: data.Object, + Provider: data.Provider, + Model: data.Model, + Status: "processing", + Stream: false, + CreatedAt: timestamp, + // Set parsed fields for serialization + InputHistoryParsed: data.InputHistory, + ParamsParsed: data.Params, + ToolsParsed: data.Tools, + SpeechInputParsed: data.SpeechInput, + TranscriptionInputParsed: data.TranscriptionInput, + } + + return p.db.Create(entry).Error +} + +// updateLogEntry updates an existing log entry using GORM +func (p *LoggerPlugin) updateLogEntry(requestID string, timestamp time.Time, data *UpdateLogData, ctx context.Context) error { + updates := make(map[string]interface{}) + + // Try to get original timestamp from context first for latency calculation + latency, err := p.calculateLatency(requestID, timestamp, ctx) + if err != nil { + return err + } + updates["latency"] = latency + + updates["status"] = data.Status + + if data.Model != "" { + updates["model"] = data.Model + } + + if data.Object != "" { + updates["object_type"] = data.Object // Note: using object_type for database column + } + + // Handle JSON fields by setting them on a temporary entry and serializing + tempEntry := &LogEntry{} + if data.OutputMessage != nil { + tempEntry.OutputMessageParsed = data.OutputMessage + if err := tempEntry.serializeFields(); err == nil { + updates["output_message"] = tempEntry.OutputMessage + updates["content_summary"] = tempEntry.ContentSummary // Update content summary + } + } + + if data.ToolCalls != nil { + tempEntry.ToolCallsParsed = data.ToolCalls + if err := tempEntry.serializeFields(); err == nil { + updates["tool_calls"] = tempEntry.ToolCalls + } + } + + if data.SpeechOutput != nil { + tempEntry.SpeechOutputParsed = data.SpeechOutput + if err := tempEntry.serializeFields(); err == nil { + updates["speech_output"] = tempEntry.SpeechOutput + } + } + + if data.TranscriptionOutput != nil { + tempEntry.TranscriptionOutputParsed = data.TranscriptionOutput + if err := tempEntry.serializeFields(); err == nil { + updates["transcription_output"] = tempEntry.TranscriptionOutput + } + } + + if data.TokenUsage != nil { + tempEntry.TokenUsageParsed = data.TokenUsage + if err := tempEntry.serializeFields(); err == nil { + updates["token_usage"] = tempEntry.TokenUsage + updates["prompt_tokens"] = data.TokenUsage.PromptTokens + updates["completion_tokens"] = data.TokenUsage.CompletionTokens + updates["total_tokens"] = data.TokenUsage.TotalTokens + } + } + + if data.ErrorDetails != nil { + tempEntry.ErrorDetailsParsed = data.ErrorDetails + if err := tempEntry.serializeFields(); err == nil { + updates["error_details"] = tempEntry.ErrorDetails + } + } + + return p.db.Model(&LogEntry{}).Where("id = ?", requestID).Updates(updates).Error +} + +// processStreamUpdate handles streaming updates using GORM +func (p *LoggerPlugin) processStreamUpdate(requestID string, timestamp time.Time, data *StreamUpdateData, isFinalChunk bool, ctx context.Context) error { + updates := make(map[string]interface{}) + + // Handle error case first + if data.ErrorDetails != nil { + latency, err := p.calculateLatency(requestID, timestamp, ctx) + if err != nil { + // If we can't get created_at, just update status and error + tempEntry := &LogEntry{} + tempEntry.ErrorDetailsParsed = data.ErrorDetails + if err := tempEntry.serializeFields(); err == nil { + return p.db.Model(&LogEntry{}).Where("id = ?", requestID).Updates(map[string]interface{}{ + "status": "error", + "error_details": tempEntry.ErrorDetails, + }).Error + } + return err + } + + tempEntry := &LogEntry{} + tempEntry.ErrorDetailsParsed = data.ErrorDetails + if err := tempEntry.serializeFields(); err != nil { + return fmt.Errorf("failed to serialize error details: %w", err) + } + return p.db.Model(&LogEntry{}).Where("id = ?", requestID).Updates(map[string]interface{}{ + "status": "error", + "error_details": tempEntry.ErrorDetails, + "latency": latency, + "timestamp": timestamp, + }).Error + } + + // Always mark as streaming and update timestamp + updates["stream"] = true + updates["timestamp"] = timestamp + + // Calculate latency when stream finishes + var needsLatency bool + var latency float64 + + if isFinalChunk { + // Stream is finishing, calculate latency + var err error + latency, err = p.calculateLatency(requestID, timestamp, ctx) + if err != nil { + return fmt.Errorf("failed to get created_at for latency calculation: %w", err) + } + needsLatency = true + } + + // Add latency if this is the final chunk + if needsLatency { + updates["latency"] = latency + } + + // Update model if provided + if data.Model != "" { + updates["model"] = data.Model + } + + // Update object type if provided + if data.Object != "" { + updates["object_type"] = data.Object // Note: using object_type for database column + } + + // Update token usage if provided + if data.TokenUsage != nil { + tempEntry := &LogEntry{} + tempEntry.TokenUsageParsed = data.TokenUsage + if err := tempEntry.serializeFields(); err == nil { + updates["token_usage"] = tempEntry.TokenUsage + updates["prompt_tokens"] = data.TokenUsage.PromptTokens + updates["completion_tokens"] = data.TokenUsage.CompletionTokens + updates["total_tokens"] = data.TokenUsage.TotalTokens + } + } + + // Handle finish reason - if present, mark as complete + if isFinalChunk { + updates["status"] = "success" + } + + // Process delta content and tool calls if present + if data.Delta != nil { + deltaUpdates, err := p.prepareDeltaUpdates(requestID, data.Delta) + if err != nil { + return fmt.Errorf("failed to prepare delta updates: %w", err) + } + // Merge delta updates into main updates + for key, value := range deltaUpdates { + updates[key] = value + } + } + + // Handle transcription output from stream updates + if data.TranscriptionOutput != nil { + tempEntry := &LogEntry{} + tempEntry.TranscriptionOutputParsed = data.TranscriptionOutput + if err := tempEntry.serializeFields(); err != nil { + return fmt.Errorf("failed to serialize transcription output: %w", err) + } + updates["transcription_output"] = tempEntry.TranscriptionOutput + } + + // Only perform update if there's something to update + if len(updates) > 0 { + return p.db.Model(&LogEntry{}).Where("id = ?", requestID).Updates(updates).Error + } + + return nil +} + +// calculateLatency computes latency in milliseconds from creation time +func (p *LoggerPlugin) calculateLatency(requestID string, currentTime time.Time, ctx context.Context) (float64, error) { + // Try to get original timestamp from context first + if ctxTimestamp, ok := ctx.Value(CreatedTimestampKey).(time.Time); ok { + return float64(currentTime.Sub(ctxTimestamp).Nanoseconds()) / 1e6, nil + } + + // Fallback to database query if not found in context + var originalEntry LogEntry + if err := p.db.Select("created_at").Where("id = ?", requestID).First(&originalEntry).Error; err != nil { + return 0, err + } + return float64(currentTime.Sub(originalEntry.CreatedAt).Nanoseconds()) / 1e6, nil +} + +// prepareDeltaUpdates prepares updates for streaming delta content without executing them +func (p *LoggerPlugin) prepareDeltaUpdates(requestID string, delta *schemas.BifrostStreamDelta) (map[string]interface{}, error) { + // Only fetch existing content if we have content or tool calls to append + if (delta.Content == nil || *delta.Content == "") && len(delta.ToolCalls) == 0 && delta.Refusal == nil { + return map[string]interface{}{}, nil + } + + // Get current entry + var currentEntry LogEntry + if err := p.db.Where("id = ?", requestID).First(¤tEntry).Error; err != nil { + return nil, fmt.Errorf("failed to get existing entry: %w", err) + } + + // Parse existing message or create new one + var outputMessage *schemas.BifrostMessage + if currentEntry.OutputMessage != "" { + outputMessage = &schemas.BifrostMessage{} + // Attempt to deserialize; use parsed message only if successful + if err := currentEntry.deserializeFields(); err == nil && currentEntry.OutputMessageParsed != nil { + outputMessage = currentEntry.OutputMessageParsed + } else { + // Create new message if parsing fails + outputMessage = &schemas.BifrostMessage{ + Role: schemas.ModelChatMessageRoleAssistant, + Content: schemas.MessageContent{}, + } + } + } else { + // Create new message + outputMessage = &schemas.BifrostMessage{ + Role: schemas.ModelChatMessageRoleAssistant, + Content: schemas.MessageContent{}, + } + } + + // Handle role (usually in first chunk) + if delta.Role != nil { + outputMessage.Role = schemas.ModelChatMessageRole(*delta.Role) + } + + // Append content + if delta.Content != nil && *delta.Content != "" { + p.appendContentToMessage(outputMessage, *delta.Content) + } + + // Handle refusal + if delta.Refusal != nil && *delta.Refusal != "" { + if outputMessage.AssistantMessage == nil { + outputMessage.AssistantMessage = &schemas.AssistantMessage{} + } + if outputMessage.AssistantMessage.Refusal == nil { + outputMessage.AssistantMessage.Refusal = delta.Refusal + } else { + *outputMessage.AssistantMessage.Refusal += *delta.Refusal + } + } + + // Accumulate tool calls + if len(delta.ToolCalls) > 0 { + p.accumulateToolCallsInMessage(outputMessage, delta.ToolCalls) + } + + // Update the database with new content + tempEntry := &LogEntry{ + OutputMessageParsed: outputMessage, + } + if outputMessage.AssistantMessage != nil && outputMessage.AssistantMessage.ToolCalls != nil { + tempEntry.ToolCallsParsed = outputMessage.AssistantMessage.ToolCalls + } + + if err := tempEntry.serializeFields(); err != nil { + return nil, fmt.Errorf("failed to serialize fields: %w", err) + } + + updates := map[string]interface{}{ + "output_message": tempEntry.OutputMessage, + "content_summary": tempEntry.ContentSummary, + } + + // Also update tool_calls field for backward compatibility + if tempEntry.ToolCalls != "" { + updates["tool_calls"] = tempEntry.ToolCalls + } + + return updates, nil +} + +// getLogEntry retrieves a log entry by ID using GORM +func (p *LoggerPlugin) getLogEntry(requestID string) (*LogEntry, error) { + var entry LogEntry + err := p.db.Where("id = ?", requestID).First(&entry).Error + if err != nil { + return nil, err + } + return &entry, nil +} + +// SearchLogs searches logs with filters and pagination using GORM +func (p *LoggerPlugin) SearchLogs(filters SearchFilters, pagination PaginationOptions) (*SearchResult, error) { + // Set default pagination if not provided + if pagination.Limit == 0 { + pagination.Limit = 50 + } + if pagination.SortBy == "" { + pagination.SortBy = "timestamp" + } + if pagination.Order == "" { + pagination.Order = "desc" + } + + // Build base query with all filters applied + baseQuery := p.db.Model(&LogEntry{}) + + // Apply filters efficiently + if len(filters.Providers) > 0 { + baseQuery = baseQuery.Where("provider IN ?", filters.Providers) + } + if len(filters.Models) > 0 { + baseQuery = baseQuery.Where("model IN ?", filters.Models) + } + if len(filters.Status) > 0 { + baseQuery = baseQuery.Where("status IN ?", filters.Status) + } + if len(filters.Objects) > 0 { + baseQuery = baseQuery.Where("object_type IN ?", filters.Objects) + } + if filters.StartTime != nil { + baseQuery = baseQuery.Where("timestamp >= ?", *filters.StartTime) + } + if filters.EndTime != nil { + baseQuery = baseQuery.Where("timestamp <= ?", *filters.EndTime) + } + if filters.MinLatency != nil { + baseQuery = baseQuery.Where("latency >= ?", *filters.MinLatency) + } + if filters.MaxLatency != nil { + baseQuery = baseQuery.Where("latency <= ?", *filters.MaxLatency) + } + if filters.MinTokens != nil { + baseQuery = baseQuery.Where("total_tokens >= ?", *filters.MinTokens) + } + if filters.MaxTokens != nil { + baseQuery = baseQuery.Where("total_tokens <= ?", *filters.MaxTokens) + } + if filters.ContentSearch != "" { + baseQuery = baseQuery.Where("content_summary LIKE ?", "%"+filters.ContentSearch+"%") + } + + // Get total count + var totalCount int64 + if err := baseQuery.Count(&totalCount).Error; err != nil { + return nil, err + } + + // Initialize stats + stats := SearchStats{} + + // Calculate statistics efficiently if we have data + if totalCount > 0 { + // Total requests should include all requests (processing, success, error) + stats.TotalRequests = totalCount + + // Get completed requests count (success + error, excluding processing) for success rate calculation + var completedCount int64 + completedQuery := baseQuery.Session(&gorm.Session{}) + if err := completedQuery.Where("status IN ?", []string{"success", "error"}).Count(&completedCount).Error; err != nil { + return nil, err + } + + if completedCount > 0 { + // Calculate success rate based on completed requests only + var successCount int64 + successQuery := baseQuery.Session(&gorm.Session{}) + if err := successQuery.Where("status = ?", "success").Count(&successCount).Error; err != nil { + return nil, err + } + stats.SuccessRate = float64(successCount) / float64(completedCount) * 100 + + // Calculate average latency and total tokens in a single query for better performance + var result struct { + AvgLatency sql.NullFloat64 `json:"avg_latency"` + TotalTokens sql.NullInt64 `json:"total_tokens"` + } + + statsQuery := baseQuery.Session(&gorm.Session{}) + if err := statsQuery.Select("AVG(latency) as avg_latency, SUM(total_tokens) as total_tokens").Scan(&result).Error; err != nil { + return nil, err + } + + if result.AvgLatency.Valid { + stats.AverageLatency = result.AvgLatency.Float64 + } + if result.TotalTokens.Valid { + stats.TotalTokens = result.TotalTokens.Int64 + } + } + } + + // Build order clause + direction := "DESC" + if pagination.Order == "asc" { + direction = "ASC" + } + + var orderClause string + switch pagination.SortBy { + case "timestamp": + orderClause = "timestamp " + direction + case "latency": + orderClause = "latency " + direction + case "tokens": + orderClause = "total_tokens " + direction + default: + orderClause = "timestamp " + direction + } + + // Execute main query with sorting and pagination + var logs []LogEntry + mainQuery := baseQuery.Order(orderClause) + + if pagination.Limit > 0 { + mainQuery = mainQuery.Limit(pagination.Limit) + } + if pagination.Offset > 0 { + mainQuery = mainQuery.Offset(pagination.Offset) + } + + if err := mainQuery.Find(&logs).Error; err != nil { + return nil, err + } + + return &SearchResult{ + Logs: logs, + Pagination: pagination, + Stats: stats, + }, nil +} + + diff --git a/transports/bifrost-http/plugins/logging/streaming.go b/transports/bifrost-http/plugins/logging/streaming.go new file mode 100644 index 0000000000..53551a1907 --- /dev/null +++ b/transports/bifrost-http/plugins/logging/streaming.go @@ -0,0 +1,72 @@ +// Package logging provides streaming-related functionality for the GORM-based logging plugin +package logging + +import ( + "github.com/maximhq/bifrost/core/schemas" +) + +// appendContentToMessage efficiently appends content to a message +func (p *LoggerPlugin) appendContentToMessage(message *schemas.BifrostMessage, newContent string) { + if message == nil { + return + } + if message.Content.ContentStr != nil { + // Append to existing string content + *message.Content.ContentStr += newContent + } else if message.Content.ContentBlocks != nil { + // Find the last text block and append, or create new one + blocks := *message.Content.ContentBlocks + if len(blocks) > 0 && blocks[len(blocks)-1].Type == schemas.ContentBlockTypeText && blocks[len(blocks)-1].Text != nil { + // Append to last text block + *blocks[len(blocks)-1].Text += newContent + } else { + // Create new text block + blocks = append(blocks, schemas.ContentBlock{ + Type: schemas.ContentBlockTypeText, + Text: &newContent, + }) + message.Content.ContentBlocks = &blocks + } + } else { + // Initialize with string content + message.Content.ContentStr = &newContent + } +} + +// accumulateToolCallsInMessage efficiently accumulates tool calls in a message +func (p *LoggerPlugin) accumulateToolCallsInMessage(message *schemas.BifrostMessage, deltaToolCalls []schemas.ToolCall) { + if message == nil { + return + } + if message.AssistantMessage == nil { + message.AssistantMessage = &schemas.AssistantMessage{} + } + + if message.AssistantMessage.ToolCalls == nil { + message.AssistantMessage.ToolCalls = &[]schemas.ToolCall{} + } + + existingToolCalls := *message.AssistantMessage.ToolCalls + + for _, deltaToolCall := range deltaToolCalls { + // Find existing tool call with same ID or create new one + found := false + for i := range existingToolCalls { + if existingToolCalls[i].ID != nil && deltaToolCall.ID != nil && + *existingToolCalls[i].ID == *deltaToolCall.ID { + // Append arguments to existing tool call + existingToolCalls[i].Function.Arguments += deltaToolCall.Function.Arguments + found = true + break + } + } + + if !found { + // Add new tool call + existingToolCalls = append(existingToolCalls, deltaToolCall) + } + } + + message.AssistantMessage.ToolCalls = &existingToolCalls +} + diff --git a/transports/bifrost-http/plugins/logging/utils.go b/transports/bifrost-http/plugins/logging/utils.go index bbac663b0c..39d4907be7 100644 --- a/transports/bifrost-http/plugins/logging/utils.go +++ b/transports/bifrost-http/plugins/logging/utils.go @@ -1,942 +1,7 @@ +// Package logging provides utility functions and interfaces for the GORM-based logging plugin package logging -import ( - "database/sql" - "encoding/json" - "fmt" - "strings" - "time" - - "github.com/maximhq/bifrost/core/schemas" -) - -// insertInitialLogEntry stores an initial log entry in SQLite using the new async data structure -func (p *LoggerPlugin) insertInitialLogEntry(requestID string, timestamp time.Time, data *InitialLogData) error { - // Serialize complex fields to JSON - inputHistoryJSON, _ := json.Marshal(data.InputHistory) - toolsJSON, _ := json.Marshal(data.Tools) - paramsJSON, _ := json.Marshal(data.Params) - speechInputJSON, _ := json.Marshal(data.SpeechInput) - transcriptionInputJSON, _ := json.Marshal(data.TranscriptionInput) - - // Create content summary for searching - contentSummary := p.createContentSummaryFromInitialData(data) - - // Insert into main table - query := ` - INSERT INTO logs ( - id, provider, model, object_type, status, - input_history, tools, params, speech_input, transcription_input, - content_summary, stream, created_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` - - _, err := p.db.Exec(query, - requestID, data.Provider, data.Model, - data.Object, "processing", - string(inputHistoryJSON), string(toolsJSON), string(paramsJSON), - string(speechInputJSON), string(transcriptionInputJSON), - contentSummary, false, timestamp.UnixNano()) - - if err != nil { - return fmt.Errorf("failed to insert initial log entry: %w", err) - } - - return nil -} - -// updateLogEntry updates an existing log entry with new data using the new async data structure -func (p *LoggerPlugin) updateLogEntry(requestID string, timestamp time.Time, data *UpdateLogData) error { - // First, get the created_at timestamp to calculate latency - var createdAtUnix int64 - err := p.db.QueryRow("SELECT created_at FROM logs WHERE id = ?", requestID).Scan(&createdAtUnix) - if err != nil { - return fmt.Errorf("failed to get created_at for latency calculation: %w", err) - } - - createdAt := time.Unix(createdAtUnix/1e9, createdAtUnix%1e9) // Convert from nanoseconds - latency := float64(timestamp.Sub(createdAt).Milliseconds()) - - // Build dynamic UPDATE query - var setParts []string - var args []interface{} - - // Update request timestamp - setParts = append(setParts, "timestamp = ?") - args = append(args, timestamp.UnixNano()) - - // Always update latency - setParts = append(setParts, "latency = ?") - args = append(args, latency) - - // Update status - if data.Status != "" { - setParts = append(setParts, "status = ?") - args = append(args, data.Status) - } - - // Update model if provided - if data.Model != "" { - setParts = append(setParts, "model = ?") - args = append(args, data.Model) - } - - // Update object type if provided - if data.Object != "" { - setParts = append(setParts, "object_type = ?") - args = append(args, data.Object) - } - - // Update token usage - if data.TokenUsage != nil { - setParts = append(setParts, "prompt_tokens = ?, completion_tokens = ?, total_tokens = ?") - args = append(args, data.TokenUsage.PromptTokens, data.TokenUsage.CompletionTokens, data.TokenUsage.TotalTokens) - } - - // Update output message - if data.OutputMessage != nil { - outputMessageJSON, _ := json.Marshal(data.OutputMessage) - setParts = append(setParts, "output_message = ?") - args = append(args, string(outputMessageJSON)) - } - - // Update tool calls - if data.ToolCalls != nil { - toolCallsJSON, _ := json.Marshal(data.ToolCalls) - setParts = append(setParts, "tool_calls = ?") - args = append(args, string(toolCallsJSON)) - } - - // Update error details - if data.ErrorDetails != nil { - errorDetailsJSON, _ := json.Marshal(data.ErrorDetails) - setParts = append(setParts, "error_details = ?") - args = append(args, string(errorDetailsJSON)) - } - - // Update speech output (for non-streaming) - if data.SpeechOutput != nil { - speechOutputJSON, _ := json.Marshal(data.SpeechOutput) - setParts = append(setParts, "speech_output = ?") - args = append(args, string(speechOutputJSON)) - } - - // Update transcription output (for non-streaming) - if data.TranscriptionOutput != nil { - transcriptionOutputJSON, _ := json.Marshal(data.TranscriptionOutput) - setParts = append(setParts, "transcription_output = ?") - args = append(args, string(transcriptionOutputJSON)) - } - - // Add the WHERE clause parameter - args = append(args, requestID) - - query := fmt.Sprintf("UPDATE logs SET %s WHERE id = ?", strings.Join(setParts, ", ")) - - // Update content summary if we have new content - if data.OutputMessage != nil { - // Get current log entry to rebuild content summary - if currentEntry, err := p.getLogEntry(requestID); err == nil { - newContentSummary := p.createContentSummary(currentEntry) - query = strings.Replace(query, "WHERE id = ?", ", content_summary = ? WHERE id = ?", 1) - // Insert content_summary before the requestID in args - args = append(args[:len(args)-1], newContentSummary, args[len(args)-1]) - } - } - - _, err = p.db.Exec(query, args...) - if err != nil { - return fmt.Errorf("failed to update log entry: %w", err) - } - - return nil -} - -// processStreamUpdate handles streaming delta updates efficiently with minimal database operations -func (p *LoggerPlugin) processStreamUpdate(requestID string, timestamp time.Time, data *StreamUpdateData, isFinalChunk bool) error { - // Handle error case first - if data.ErrorDetails != nil { - // For errors, we should also calculate latency - // Get created_at timestamp to calculate latency - var createdAtUnix int64 - err := p.db.QueryRow("SELECT created_at FROM logs WHERE id = ?", requestID).Scan(&createdAtUnix) - if err != nil { - // If we can't get created_at, just update status and error - query := "UPDATE logs SET status = ?, error_details = ? WHERE id = ?" - errorDetailsJSON, _ := json.Marshal(data.ErrorDetails) - _, execErr := p.db.Exec(query, "error", string(errorDetailsJSON), requestID) - return execErr - } - - createdAt := time.Unix(createdAtUnix/1e9, createdAtUnix%1e9) - latency := float64(timestamp.Sub(createdAt).Milliseconds()) - - query := "UPDATE logs SET status = ?, error_details = ?, latency = ?, timestamp = ? WHERE id = ?" - errorDetailsJSON, _ := json.Marshal(data.ErrorDetails) - _, err = p.db.Exec(query, "error", string(errorDetailsJSON), latency, timestamp.UnixNano(), requestID) - return err - } - - // For streaming updates, we need to calculate latency when the stream finishes - var needsLatency bool - var latency float64 - - if isFinalChunk { - // Stream is finishing, calculate latency - var createdAtUnix int64 - err := p.db.QueryRow("SELECT created_at FROM logs WHERE id = ?", requestID).Scan(&createdAtUnix) - if err != nil { - return fmt.Errorf("failed to get created_at for latency calculation: %w", err) - } - - createdAt := time.Unix(createdAtUnix/1e9, createdAtUnix%1e9) - latency = float64(timestamp.Sub(createdAt).Milliseconds()) - needsLatency = true - } - - // Build dynamic UPDATE query for streaming data - var setParts []string - var args []interface{} - - // Always mark as streaming and update timestamp - setParts = append(setParts, "stream = ?, timestamp = ?") - args = append(args, true, timestamp.UnixNano()) - - // Add latency if this is the final chunk - if needsLatency { - setParts = append(setParts, "latency = ?") - args = append(args, latency) - } - - // Update model if provided - if data.Model != "" { - setParts = append(setParts, "model = ?") - args = append(args, data.Model) - } - - // Update object type if provided - if data.Object != "" { - setParts = append(setParts, "object_type = ?") - args = append(args, data.Object) - } - - // Update token usage if provided - if data.TokenUsage != nil { - setParts = append(setParts, "prompt_tokens = ?, completion_tokens = ?, total_tokens = ?") - args = append(args, data.TokenUsage.PromptTokens, data.TokenUsage.CompletionTokens, data.TokenUsage.TotalTokens) - } - - // Handle finish reason - if present, mark as complete - if isFinalChunk { - setParts = append(setParts, "status = ?") - args = append(args, "success") - } - - // Process delta content and tool calls if present - if data.Delta != nil { - if err := p.appendDeltaToEntry(requestID, data.Delta, &setParts, &args); err != nil { - return fmt.Errorf("failed to append delta: %w", err) - } - } - - // Handle transcription output from stream updates - if data.TranscriptionOutput != nil { - transcriptionOutputJSON, err := json.Marshal(data.TranscriptionOutput) - if err != nil { - return fmt.Errorf("failed to marshal transcription output: %w", err) - } - - setParts = append(setParts, "transcription_output = ?") - args = append(args, string(transcriptionOutputJSON)) - } - - // Only perform update if there's something to update - if len(setParts) > 0 { - // Add WHERE clause parameter - args = append(args, requestID) - - query := fmt.Sprintf("UPDATE logs SET %s WHERE id = ?", strings.Join(setParts, ", ")) - _, err := p.db.Exec(query, args...) - if err != nil { - return fmt.Errorf("failed to update streaming log entry: %w", err) - } - } - - return nil -} - -// appendDeltaToEntry efficiently appends streaming delta content to existing database entry -func (p *LoggerPlugin) appendDeltaToEntry(requestID string, delta *schemas.BifrostStreamDelta, setParts *[]string, args *[]interface{}) error { - // Only fetch existing content if we have content or tool calls to append - if (delta.Content == nil || *delta.Content == "") && len(delta.ToolCalls) == 0 && delta.Refusal == nil { - return nil - } - - // Get only the necessary fields from existing entry - var outputMessageJSON sql.NullString - err := p.db.QueryRow("SELECT output_message FROM logs WHERE id = ?", requestID).Scan(&outputMessageJSON) - if err != nil { - return fmt.Errorf("failed to get existing output message: %w", err) - } - - // Parse existing message or create new one - var outputMessage *schemas.BifrostMessage - if outputMessageJSON.Valid && outputMessageJSON.String != "null" && outputMessageJSON.String != "" { - outputMessage = &schemas.BifrostMessage{} - if err := json.Unmarshal([]byte(outputMessageJSON.String), outputMessage); err != nil { - // If unmarshaling fails, create new message - outputMessage = &schemas.BifrostMessage{ - Role: schemas.ModelChatMessageRoleAssistant, - Content: schemas.MessageContent{}, - } - } - } else { - // Create new message - outputMessage = &schemas.BifrostMessage{ - Role: schemas.ModelChatMessageRoleAssistant, - Content: schemas.MessageContent{}, - } - } - - // Handle role (usually in first chunk) - if delta.Role != nil { - outputMessage.Role = schemas.ModelChatMessageRole(*delta.Role) - } - - // Append content - if delta.Content != nil && *delta.Content != "" { - p.appendContentToMessage(outputMessage, *delta.Content) - } - - // Handle refusal - if delta.Refusal != nil && *delta.Refusal != "" { - if outputMessage.AssistantMessage == nil { - outputMessage.AssistantMessage = &schemas.AssistantMessage{} - } - if outputMessage.AssistantMessage.Refusal == nil { - outputMessage.AssistantMessage.Refusal = delta.Refusal - } else { - *outputMessage.AssistantMessage.Refusal += *delta.Refusal - } - } - - // Accumulate tool calls - if len(delta.ToolCalls) > 0 { - p.accumulateToolCallsInMessage(outputMessage, delta.ToolCalls) - } - - // Update the database fields - updatedMessageJSON, _ := json.Marshal(outputMessage) - *setParts = append(*setParts, "output_message = ?") - *args = append(*args, string(updatedMessageJSON)) - - // Also update tool_calls field for backward compatibility - if outputMessage.AssistantMessage != nil && outputMessage.AssistantMessage.ToolCalls != nil { - toolCallsJSON, _ := json.Marshal(outputMessage.AssistantMessage.ToolCalls) - *setParts = append(*setParts, "tool_calls = ?") - *args = append(*args, string(toolCallsJSON)) - } - - return nil -} - -// appendContentToMessage efficiently appends content to a message -func (p *LoggerPlugin) appendContentToMessage(message *schemas.BifrostMessage, newContent string) { - if message.Content.ContentStr != nil { - // Append to existing string content - *message.Content.ContentStr += newContent - } else if message.Content.ContentBlocks != nil { - // Find the last text block and append, or create new one - blocks := *message.Content.ContentBlocks - if len(blocks) > 0 && blocks[len(blocks)-1].Type == schemas.ContentBlockTypeText && blocks[len(blocks)-1].Text != nil { - // Append to last text block - *blocks[len(blocks)-1].Text += newContent - } else { - // Create new text block - blocks = append(blocks, schemas.ContentBlock{ - Type: schemas.ContentBlockTypeText, - Text: &newContent, - }) - message.Content.ContentBlocks = &blocks - } - } else { - // Initialize with string content - message.Content.ContentStr = &newContent - } -} - -// accumulateToolCallsInMessage efficiently accumulates tool calls in a message -func (p *LoggerPlugin) accumulateToolCallsInMessage(message *schemas.BifrostMessage, deltaToolCalls []schemas.ToolCall) { - if message.AssistantMessage == nil { - message.AssistantMessage = &schemas.AssistantMessage{} - } - - if message.AssistantMessage.ToolCalls == nil { - message.AssistantMessage.ToolCalls = &[]schemas.ToolCall{} - } - - existingToolCalls := *message.AssistantMessage.ToolCalls - - for _, deltaToolCall := range deltaToolCalls { - // Find existing tool call with same ID or create new one - found := false - for i := range existingToolCalls { - if existingToolCalls[i].ID != nil && deltaToolCall.ID != nil && - *existingToolCalls[i].ID == *deltaToolCall.ID { - // Append arguments to existing tool call - existingToolCalls[i].Function.Arguments += deltaToolCall.Function.Arguments - found = true - break - } - } - - if !found { - // Add new tool call - existingToolCalls = append(existingToolCalls, deltaToolCall) - } - } - - message.AssistantMessage.ToolCalls = &existingToolCalls -} - -// createContentSummaryFromInitialData creates a searchable content summary from initial log data -func (p *LoggerPlugin) createContentSummaryFromInitialData(data *InitialLogData) string { - var parts []string - - // Add input history content - for _, msg := range data.InputHistory { - if msg.Content.ContentStr != nil { - parts = append(parts, *msg.Content.ContentStr) - } - } - - // Add speech input content - if data.SpeechInput != nil && data.SpeechInput.Input != "" { - parts = append(parts, data.SpeechInput.Input) - if data.SpeechInput.Instructions != "" { - parts = append(parts, data.SpeechInput.Instructions) - } - } - - // Add transcription input prompt if available - if data.TranscriptionInput != nil && data.TranscriptionInput.Prompt != nil && *data.TranscriptionInput.Prompt != "" { - parts = append(parts, *data.TranscriptionInput.Prompt) - } - - return strings.Join(parts, " ") -} - -// getLogEntry retrieves a complete log entry by ID -func (p *LoggerPlugin) getLogEntry(requestID string) (*LogEntry, error) { - query := ` - SELECT id, timestamp, provider, model, object_type, status, latency, - prompt_tokens, completion_tokens, total_tokens, - input_history, output_message, tools, tool_calls, - params, error_details, speech_input, transcription_input, - speech_output, transcription_output, stream, created_at - FROM logs WHERE id = ?` - - var entry LogEntry - var timestampUnix, createdAtUnix int64 - var inputHistoryJSON, outputMessageJSON, toolsJSON, toolCallsJSON sql.NullString - var paramsJSON, errorDetailsJSON sql.NullString - var speechInputJSON, transcriptionInputJSON sql.NullString - var speechOutputJSON, transcriptionOutputJSON sql.NullString - var promptTokens, completionTokens, totalTokensRow sql.NullInt64 - var latency sql.NullFloat64 - var stream sql.NullBool - - err := p.db.QueryRow(query, requestID).Scan( - &entry.ID, ×tampUnix, &entry.Provider, &entry.Model, - &entry.Object, &entry.Status, &latency, - &promptTokens, &completionTokens, &totalTokensRow, - &inputHistoryJSON, &outputMessageJSON, &toolsJSON, &toolCallsJSON, - ¶msJSON, &errorDetailsJSON, &speechInputJSON, &transcriptionInputJSON, - &speechOutputJSON, &transcriptionOutputJSON, &stream, - &createdAtUnix, - ) - if err != nil { - return nil, fmt.Errorf("failed to get log entry: %w", err) - } - - // Convert timestamps - entry.Timestamp = time.Unix(timestampUnix/1e9, timestampUnix%1e9) // Convert from nanoseconds - entry.CreatedAt = time.Unix(createdAtUnix/1e9, createdAtUnix%1e9) // Convert from nanoseconds - - // Handle latency - if latency.Valid { - entry.Latency = &latency.Float64 - } - - // Handle stream flag - if stream.Valid { - entry.Stream = stream.Bool - } - - // Handle token usage - if promptTokens.Valid || completionTokens.Valid || totalTokensRow.Valid { - entry.TokenUsage = &schemas.LLMUsage{} - if promptTokens.Valid { - entry.TokenUsage.PromptTokens = int(promptTokens.Int64) - } - if completionTokens.Valid { - entry.TokenUsage.CompletionTokens = int(completionTokens.Int64) - } - if totalTokensRow.Valid { - entry.TokenUsage.TotalTokens = int(totalTokensRow.Int64) - } - } - - // Deserialize JSON fields with NULL checks - if inputHistoryJSON.Valid { - json.Unmarshal([]byte(inputHistoryJSON.String), &entry.InputHistory) - } - if outputMessageJSON.Valid { - json.Unmarshal([]byte(outputMessageJSON.String), &entry.OutputMessage) - } - if toolsJSON.Valid { - json.Unmarshal([]byte(toolsJSON.String), &entry.Tools) - } - if toolCallsJSON.Valid { - json.Unmarshal([]byte(toolCallsJSON.String), &entry.ToolCalls) - } - - if paramsJSON.Valid { - json.Unmarshal([]byte(paramsJSON.String), &entry.Params) - } - if errorDetailsJSON.Valid { - json.Unmarshal([]byte(errorDetailsJSON.String), &entry.ErrorDetails) - } - if speechInputJSON.Valid { - json.Unmarshal([]byte(speechInputJSON.String), &entry.SpeechInput) - } - if transcriptionInputJSON.Valid { - json.Unmarshal([]byte(transcriptionInputJSON.String), &entry.TranscriptionInput) - } - if speechOutputJSON.Valid { - json.Unmarshal([]byte(speechOutputJSON.String), &entry.SpeechOutput) - } - if transcriptionOutputJSON.Valid { - json.Unmarshal([]byte(transcriptionOutputJSON.String), &entry.TranscriptionOutput) - } - - return &entry, nil -} - -// createContentSummary creates a searchable content summary from the log entry -func (p *LoggerPlugin) createContentSummary(entry *LogEntry) string { - var parts []string - - // Add input history content - for _, msg := range entry.InputHistory { - if msg.Content.ContentStr != nil { - parts = append(parts, *msg.Content.ContentStr) - } - } - - // Add output message content - if entry.OutputMessage != nil && entry.OutputMessage.Content.ContentStr != nil { - parts = append(parts, *entry.OutputMessage.Content.ContentStr) - } - - // Add tool calls content - if entry.ToolCalls != nil { - for _, toolCall := range *entry.ToolCalls { - if toolCall.Function.Arguments != "" { - parts = append(parts, toolCall.Function.Arguments) - } - } - } - - // Add speech input content - if entry.SpeechInput != nil && entry.SpeechInput.Input != "" { - parts = append(parts, entry.SpeechInput.Input) - if entry.SpeechInput.Instructions != "" { - parts = append(parts, entry.SpeechInput.Instructions) - } - } - - // Add transcription input prompt if available - if entry.TranscriptionInput != nil && entry.TranscriptionInput.Prompt != nil && *entry.TranscriptionInput.Prompt != "" { - parts = append(parts, *entry.TranscriptionInput.Prompt) - } - - // Add transcription output text - if entry.TranscriptionOutput != nil && entry.TranscriptionOutput.Text != "" { - parts = append(parts, entry.TranscriptionOutput.Text) - } - - // Add error details - if entry.ErrorDetails != nil { - parts = append(parts, entry.ErrorDetails.Error.Message) - } - - return strings.Join(parts, " ") -} - -// SearchLogs searches for log entries based on filters and pagination -func (p *LoggerPlugin) SearchLogs(filters *SearchFilters, pagination *PaginationOptions) (*SearchResult, error) { - if pagination == nil { - pagination = &PaginationOptions{ - Limit: 50, - Offset: 0, - SortBy: "timestamp", - Order: "desc", - } - } - - // Build the SQL query - query, countQuery, args := p.buildSearchQuery(filters, pagination) - - // Get total count and global statistics (exclude LIMIT and OFFSET args) - filterArgs := args[:len(args)-2] - - var totalCount int64 - err := p.db.QueryRow(countQuery, filterArgs...).Scan(&totalCount) - if err != nil { - return nil, fmt.Errorf("failed to get total count: %w", err) - } - - // Calculate global statistics only from completed requests (exclude processing status) - var globalAverageLatency float64 - var globalTotalTokens int64 - var globalSuccessfulRequests int64 - var globalCompletedRequests int64 - - if totalCount > 0 { - // Build statistics query with same filters but no pagination, excluding processing entries - statsQuery := strings.Replace(countQuery, "COUNT(*)", - "AVG(latency) as avg_latency, SUM(total_tokens) as total_tokens, COUNT(CASE WHEN status = 'success' THEN 1 END) as successful_requests, COUNT(CASE WHEN status IN ('success', 'error') THEN 1 END) as completed_requests", 1) - - var avgLatency sql.NullFloat64 - var totalTokens sql.NullInt64 - var successfulRequests sql.NullInt64 - var completedRequests sql.NullInt64 - - err = p.db.QueryRow(statsQuery, filterArgs...).Scan(&avgLatency, &totalTokens, &successfulRequests, &completedRequests) - if err != nil { - return nil, fmt.Errorf("failed to get global statistics: %w", err) - } - - if avgLatency.Valid { - globalAverageLatency = avgLatency.Float64 - } - if totalTokens.Valid { - globalTotalTokens = totalTokens.Int64 - } - if successfulRequests.Valid { - globalSuccessfulRequests = successfulRequests.Int64 - } - if completedRequests.Valid { - globalCompletedRequests = completedRequests.Int64 - } - } - - // Execute main query - rows, err := p.db.Query(query, args...) - if err != nil { - return nil, fmt.Errorf("failed to execute search query: %w", err) - } - defer rows.Close() - - var logs []LogEntry - - for rows.Next() { - var entry LogEntry - var timestampUnix sql.NullInt64 - var inputHistoryJSON, outputMessageJSON, toolsJSON, toolCallsJSON sql.NullString - var paramsJSON, errorDetailsJSON sql.NullString - var speechInputJSON, transcriptionInputJSON sql.NullString - var speechOutputJSON, transcriptionOutputJSON sql.NullString - var promptTokens, completionTokens, totalTokensRow sql.NullInt64 - var latency sql.NullFloat64 - - err := rows.Scan( - &entry.ID, ×tampUnix, &entry.Provider, &entry.Model, - &entry.Object, &entry.Status, &latency, - &promptTokens, &completionTokens, &totalTokensRow, - &inputHistoryJSON, &outputMessageJSON, &toolsJSON, &toolCallsJSON, - ¶msJSON, &errorDetailsJSON, &speechInputJSON, &transcriptionInputJSON, - &speechOutputJSON, &transcriptionOutputJSON, - ) - if err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) - } - - // Convert timestamp (handle NULL values) - if timestampUnix.Valid { - entry.Timestamp = time.Unix(timestampUnix.Int64/1e9, timestampUnix.Int64%1e9) // Convert from nanoseconds - } else { - entry.Timestamp = time.Time{} // Set to zero time if NULL - } - - // Handle latency - if latency.Valid { - entry.Latency = &latency.Float64 - } - - // Handle token usage - if promptTokens.Valid || completionTokens.Valid || totalTokensRow.Valid { - entry.TokenUsage = &schemas.LLMUsage{} - if promptTokens.Valid { - entry.TokenUsage.PromptTokens = int(promptTokens.Int64) - } - if completionTokens.Valid { - entry.TokenUsage.CompletionTokens = int(completionTokens.Int64) - } - if totalTokensRow.Valid { - entry.TokenUsage.TotalTokens = int(totalTokensRow.Int64) - } - } - - // Deserialize JSON fields with NULL checks - if inputHistoryJSON.Valid { - json.Unmarshal([]byte(inputHistoryJSON.String), &entry.InputHistory) - } - if outputMessageJSON.Valid { - json.Unmarshal([]byte(outputMessageJSON.String), &entry.OutputMessage) - } - if toolsJSON.Valid { - json.Unmarshal([]byte(toolsJSON.String), &entry.Tools) - } - if toolCallsJSON.Valid { - json.Unmarshal([]byte(toolCallsJSON.String), &entry.ToolCalls) - } - - if paramsJSON.Valid { - json.Unmarshal([]byte(paramsJSON.String), &entry.Params) - } - if errorDetailsJSON.Valid { - json.Unmarshal([]byte(errorDetailsJSON.String), &entry.ErrorDetails) - } - if speechInputJSON.Valid { - json.Unmarshal([]byte(speechInputJSON.String), &entry.SpeechInput) - } - if transcriptionInputJSON.Valid { - json.Unmarshal([]byte(transcriptionInputJSON.String), &entry.TranscriptionInput) - } - if speechOutputJSON.Valid { - json.Unmarshal([]byte(speechOutputJSON.String), &entry.SpeechOutput) - } - if transcriptionOutputJSON.Valid { - json.Unmarshal([]byte(transcriptionOutputJSON.String), &entry.TranscriptionOutput) - } - - logs = append(logs, entry) - } - - // Calculate global success rate based on completed requests only - var successRate float64 - if globalCompletedRequests > 0 { - successRate = float64(globalSuccessfulRequests) / float64(globalCompletedRequests) * 100 - } - - return &SearchResult{ - Logs: logs, - Pagination: *pagination, - Stats: struct { - TotalRequests int64 `json:"total_requests"` - SuccessRate float64 `json:"success_rate"` - AverageLatency float64 `json:"average_latency"` - TotalTokens int64 `json:"total_tokens"` - }{ - TotalRequests: globalCompletedRequests, // Use completed requests count - SuccessRate: successRate, - AverageLatency: globalAverageLatency, - TotalTokens: globalTotalTokens, - }, - }, nil -} - -// buildSearchQuery constructs the SQL query based on filters and pagination -func (p *LoggerPlugin) buildSearchQuery(filters *SearchFilters, pagination *PaginationOptions) (string, string, []interface{}) { - var whereClauses []string - var args []interface{} - - baseQuery := ` - SELECT id, timestamp, provider, model, object_type, status, latency, - prompt_tokens, completion_tokens, total_tokens, - input_history, output_message, tools, tool_calls, - params, error_details, speech_input, transcription_input, - speech_output, transcription_output - FROM logs` - - countQuery := "SELECT COUNT(*) FROM logs" - - // Build WHERE clauses - if filters != nil { - // Provider filter - if len(filters.Providers) > 0 { - placeholders := make([]string, len(filters.Providers)) - for i, provider := range filters.Providers { - placeholders[i] = "?" - args = append(args, provider) - } - whereClauses = append(whereClauses, fmt.Sprintf("provider IN (%s)", strings.Join(placeholders, ","))) - } - - // Model filter - if len(filters.Models) > 0 { - placeholders := make([]string, len(filters.Models)) - for i, model := range filters.Models { - placeholders[i] = "?" - args = append(args, model) - } - whereClauses = append(whereClauses, fmt.Sprintf("model IN (%s)", strings.Join(placeholders, ","))) - } - - // Status filter - if len(filters.Status) > 0 { - placeholders := make([]string, len(filters.Status)) - for i, status := range filters.Status { - placeholders[i] = "?" - args = append(args, status) - } - whereClauses = append(whereClauses, fmt.Sprintf("status IN (%s)", strings.Join(placeholders, ","))) - } - - // Object type filter - if len(filters.Objects) > 0 { - placeholders := make([]string, len(filters.Objects)) - for i, object := range filters.Objects { - placeholders[i] = "?" - args = append(args, object) - } - whereClauses = append(whereClauses, fmt.Sprintf("object_type IN (%s)", strings.Join(placeholders, ","))) - } - - // Time range filters - if filters.StartTime != nil { - whereClauses = append(whereClauses, "timestamp >= ?") - args = append(args, filters.StartTime.UnixNano()) - } - if filters.EndTime != nil { - whereClauses = append(whereClauses, "timestamp <= ?") - args = append(args, filters.EndTime.UnixNano()) - } - - // Latency range filters - if filters.MinLatency != nil { - whereClauses = append(whereClauses, "latency >= ?") - args = append(args, *filters.MinLatency) - } - if filters.MaxLatency != nil { - whereClauses = append(whereClauses, "latency <= ?") - args = append(args, *filters.MaxLatency) - } - - // Token range filters - if filters.MinTokens != nil { - whereClauses = append(whereClauses, "total_tokens >= ?") - args = append(args, *filters.MinTokens) - } - if filters.MaxTokens != nil { - whereClauses = append(whereClauses, "total_tokens <= ?") - args = append(args, *filters.MaxTokens) - } - - // Content search - if filters.ContentSearch != "" { - if p.checkFTSTableExists() { - // Use FTS if available and table exists - whereClauses = append(whereClauses, "id IN (SELECT id FROM logs_fts WHERE content_summary MATCH ?)") - args = append(args, filters.ContentSearch) - } else { - // Fallback to LIKE search - whereClauses = append(whereClauses, "content_summary LIKE ?") - args = append(args, "%"+filters.ContentSearch+"%") - } - } - } - - // Add WHERE clause to queries - if len(whereClauses) > 0 { - whereClause := " WHERE " + strings.Join(whereClauses, " AND ") - baseQuery += whereClause - countQuery += whereClause - } - - // Add ORDER BY - orderBy := " ORDER BY " - switch pagination.SortBy { - case "latency": - orderBy += "latency" - case "tokens": - orderBy += "total_tokens" - default: - orderBy += "timestamp" - } - - if pagination.Order == "asc" { - orderBy += " ASC" - } else { - orderBy += " DESC" - } - - baseQuery += orderBy - - // Add LIMIT and OFFSET - baseQuery += " LIMIT ? OFFSET ?" - args = append(args, pagination.Limit, pagination.Offset) - - return baseQuery, countQuery, args -} - -// checkFTSTableExists verifies if the FTS table exists and is accessible -func (p *LoggerPlugin) checkFTSTableExists() bool { - var count int - err := p.db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='logs_fts'").Scan(&count) - return err == nil && count > 0 -} - -// determineObjectType determines the object type from the input -func (p *LoggerPlugin) determineObjectType(input schemas.RequestInput) string { - if input.ChatCompletionInput != nil { - return "chat.completion" - } else if input.TextCompletionInput != nil { - return "text.completion" - } else if input.EmbeddingInput != nil { - return "embedding" - } else if input.SpeechInput != nil { - return "audio.speech" - } else if input.TranscriptionInput != nil { - return "audio.transcription" - } - return "unknown" -} - -// extractInputHistory extracts input history from the request -func (p *LoggerPlugin) extractInputHistory(input schemas.RequestInput) []schemas.BifrostMessage { - var inputHistory []schemas.BifrostMessage - - if input.ChatCompletionInput != nil { - // ChatCompletionInput is *[]BifrostMessage, so we dereference it - inputHistory = *input.ChatCompletionInput - } else if input.TextCompletionInput != nil { - // TextCompletionInput is *string, so we dereference it - if *input.TextCompletionInput != "" { - inputHistory = []schemas.BifrostMessage{ - { - Role: schemas.ModelChatMessageRoleUser, - Content: schemas.MessageContent{ - ContentStr: input.TextCompletionInput, - }, - }, - } - } - } else if input.EmbeddingInput != nil { - // EmbeddingInput has Texts field - for _, text := range input.EmbeddingInput.Texts { - inputHistory = append(inputHistory, schemas.BifrostMessage{ - Role: schemas.ModelChatMessageRoleUser, - Content: schemas.MessageContent{ - ContentStr: &text, - }, - }) - } - } - - return inputHistory -} +import "fmt" // LogManager defines the main interface that combines all logging functionality type LogManager interface { @@ -947,18 +12,23 @@ type LogManager interface { GetDroppedRequests() int64 } +// PluginLogManager implements LogManager interface wrapping the plugin type PluginLogManager struct { plugin *LoggerPlugin } func (p *PluginLogManager) Search(filters *SearchFilters, pagination *PaginationOptions) (*SearchResult, error) { - return p.plugin.SearchLogs(filters, pagination) + if filters == nil || pagination == nil { + return nil, fmt.Errorf("filters and pagination cannot be nil") + } + return p.plugin.SearchLogs(*filters, *pagination) } func (p *PluginLogManager) GetDroppedRequests() int64 { return p.plugin.droppedRequests.Load() } +// GetPluginLogManager returns a LogManager interface for this plugin func (p *LoggerPlugin) GetPluginLogManager() *PluginLogManager { return &PluginLogManager{ plugin: p, diff --git a/transports/go.mod b/transports/go.mod index cbcb8289b1..ecfbdb10cd 100644 --- a/transports/go.mod +++ b/transports/go.mod @@ -6,14 +6,17 @@ require ( github.com/fasthttp/router v1.5.4 github.com/fasthttp/websocket v1.5.12 github.com/google/uuid v1.6.0 - github.com/mattn/go-sqlite3 v1.14.28 github.com/maximhq/bifrost/core v1.1.12 github.com/maximhq/bifrost/plugins/maxim v1.0.6 github.com/prometheus/client_golang v1.22.0 github.com/valyala/fasthttp v1.62.0 google.golang.org/genai v1.4.0 + gorm.io/driver/sqlite v1.6.0 + gorm.io/gorm v1.30.0 ) +replace github.com/maximhq/bifrost/core => ../core + require ( cloud.google.com/go v0.121.0 // indirect cloud.google.com/go/auth v0.16.0 // indirect @@ -45,9 +48,12 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/gorilla/websocket v1.5.3 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/mark3labs/mcp-go v0.32.0 // indirect + github.com/mattn/go-sqlite3 v1.14.28 // indirect github.com/maximhq/maxim-go v0.1.3 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/prometheus/client_model v0.6.1 // indirect @@ -68,7 +74,7 @@ require ( golang.org/x/net v0.40.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.25.0 // indirect + golang.org/x/text v0.27.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect google.golang.org/grpc v1.72.0 // indirect google.golang.org/protobuf v1.36.6 // indirect diff --git a/transports/go.sum b/transports/go.sum index 9913d83b2d..d8dbd56357 100644 --- a/transports/go.sum +++ b/transports/go.sum @@ -74,6 +74,10 @@ github.com/googleapis/gax-go/v2 v2.14.1 h1:hb0FFeiPaQskmvakKu5EbCbpntQn48jyHuvrk github.com/googleapis/gax-go/v2 v2.14.1/go.mod h1:Hb/NubMaVM88SrNkvl8X/o8XWwDJEPqouaLeN2IUxoA= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= @@ -89,8 +93,6 @@ github.com/mark3labs/mcp-go v0.32.0 h1:fgwmbfL2gbd67obg57OfV2Dnrhs1HtSdlY/i5fn7M github.com/mark3labs/mcp-go v0.32.0/go.mod h1:rXqOudj/djTORU/ThxYx8fqEVj/5pvTuuebQ2RC7uk4= github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A= github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -github.com/maximhq/bifrost/core v1.1.12 h1:GUJbTigNZmGOjCmA6DVpmXH9gp4RG63L9iKGCJTgXYE= -github.com/maximhq/bifrost/core v1.1.12/go.mod h1:Wa/BtJoHZ0+RXYomGeAL+wyBu6iD1h6vMiUHF5RTlkA= github.com/maximhq/bifrost/plugins/maxim v1.0.6 h1:m1tWjbmxW9Lz4mDhXclQhZdFt/TrRPbZwFcoWY9ZAEk= github.com/maximhq/bifrost/plugins/maxim v1.0.6/go.mod h1:+D/E498VB4JNTEzG4fYyFJf9WQaq/9FgYrmzl49mLNc= github.com/maximhq/maxim-go v0.1.3 h1:nVzdz3hEjZVxmWHARWIM+Yrn1Jp50qrsK4BA/sz2jj8= @@ -154,12 +156,12 @@ golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU= -golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= +golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= google.golang.org/genai v1.4.0 h1:i3D6q5UTLoAHuXOaDtJnA4Lcz6v+aBP3phGBYOgzEm4= google.golang.org/genai v1.4.0/go.mod h1:TyfOKRz/QyCaj6f/ZDt505x+YreXnY40l2I6k8TvgqY= google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 h1:29cjnHVylHwTzH66WfFZqgSQgnxzvWE+jvBwpZCLRxY= @@ -172,4 +174,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ= +gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8= +gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs= +gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE= nullprogram.com/x/optparse v1.0.0/go.mod h1:KdyPE+Igbe0jQUrVfMqDMeJQIJZEuyV7pjYmp6pbG50= diff --git a/ui/components/config/provider-form.tsx b/ui/components/config/provider-form.tsx index 965e4ce71c..b5c3878131 100644 --- a/ui/components/config/provider-form.tsx +++ b/ui/components/config/provider-form.tsx @@ -745,6 +745,16 @@ export default function ProviderForm({ provider, onSave, onCancel, existingProvi {/* Network Tab */} {selectedTab === 'network' && (
+ {provider && ( + + + + The settings below require a Bifrost service restart to take effect. Current connections will continue with + existing settings until restart. + + + )} + {/* Network Configuration */}
@@ -773,12 +783,13 @@ export default function ProviderForm({ provider, onSave, onCancel, existingProvi type="number" placeholder="30" value={networkConfig.default_request_timeout_in_seconds} - onChange={(e) => + onChange={(e) => { updateField('networkConfig', { ...networkConfig, default_request_timeout_in_seconds: Number.parseInt(e.target.value) || 30, - }) - } + }); + }} + min={1} className="transition-all duration-200 ease-in-out" />
@@ -794,6 +805,7 @@ export default function ProviderForm({ provider, onSave, onCancel, existingProvi max_retries: Number.parseInt(e.target.value) || 0, }) } + min={0} className="transition-all duration-200 ease-in-out" />
@@ -899,7 +911,7 @@ export default function ProviderForm({ provider, onSave, onCancel, existingProvi onChange={(e) => updateField('performanceConfig', { ...performanceConfig, - concurrency: Number.parseInt(e.target.value) || 0, + concurrency: Number.parseInt(e.target.value) || 1, }) } className={`transition-all duration-200 ease-in-out ${!performanceValid ? 'border-destructive' : ''}`} @@ -913,7 +925,7 @@ export default function ProviderForm({ provider, onSave, onCancel, existingProvi onChange={(e) => updateField('performanceConfig', { ...performanceConfig, - buffer_size: Number.parseInt(e.target.value) || 0, + buffer_size: Number.parseInt(e.target.value) || 10, }) } className={`transition-all duration-200 ease-in-out ${!performanceValid ? 'border-destructive' : ''}`} diff --git a/ui/components/logs/log-detail-sheet.tsx b/ui/components/logs/log-detail-sheet.tsx index 3de79cd74c..2dd2fb756c 100644 --- a/ui/components/logs/log-detail-sheet.tsx +++ b/ui/components/logs/log-detail-sheet.tsx @@ -3,13 +3,14 @@ import { Badge } from '@/components/ui/badge' import { Sheet, SheetContent, SheetHeader, SheetTitle } from '@/components/ui/sheet' import { LogEntry } from '@/lib/types/logs' -import { DollarSign, FileText, Timer } from 'lucide-react' +import { DollarSign, FileText, Info, Timer } from 'lucide-react' import LogEntryDetailsView from './ui/log-entry-details-view' import moment from 'moment' import { DottedSeparator } from '@/components/ui/separator' import { PROVIDER_LABELS, Provider, Status, STATUS_COLORS, REQUEST_TYPE_LABELS, REQUEST_TYPE_COLORS } from '@/lib/constants/logs' import { CodeEditor } from './ui/code-editor' import LogMessageView from './ui/log-message-view' +import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip' import SpeechView from './ui/speech-view' import TranscriptionView from './ui/transcription-view' @@ -203,7 +204,22 @@ export function LogDetailSheet({ log, open, onOpenChange }: LogDetailSheetProps) <> {log.output_message && ( <> -
Response
+
+
Response
+ {log.stream && ( + + + + + + + The response shown may appear incomplete or out of order due to the way streamed data is accumulated for real-time + display. + + + + )} +
)}