diff --git a/core/bifrost.go b/core/bifrost.go index 4025d6bcd2..05f51c78ca 100644 --- a/core/bifrost.go +++ b/core/bifrost.go @@ -5,8 +5,11 @@ package bifrost import ( "context" + "encoding/json" "fmt" "math/rand" + "runtime" + "runtime/debug" "slices" "strings" "sync" @@ -40,6 +43,7 @@ type ChannelMessage struct { ResponseStream chan chan *schemas.BifrostStream Err chan schemas.BifrostError Type RequestType + Timestamp time.Time } // Bifrost manages providers and maintains sepcified open channels for concurrent processing. @@ -71,6 +75,28 @@ type Bifrost struct { backgroundCtx context.Context // Shared background context for nil context handling mcpManager *MCPManager // MCP integration manager (nil if MCP not configured) dropExcessRequests atomic.Bool // If true, in cases where the queue is full, requests will not wait for the queue to be empty and will be dropped instead. + + providerInstances sync.Map // provider instances for accessing metrics (thread-safe) + + // Performance Tracking + metrics AtomicMetrics + + // Pool usage counters + channelMessageGets atomic.Int64 + channelMessagePuts atomic.Int64 + channelMessageCreations atomic.Int64 + responseChannelGets atomic.Int64 + responseChannelPuts atomic.Int64 + responseChannelCreations atomic.Int64 + errorChannelGets atomic.Int64 + errorChannelPuts atomic.Int64 + errorChannelCreations atomic.Int64 + responseStreamGets atomic.Int64 + responseStreamPuts atomic.Int64 + responseStreamCreations atomic.Int64 + pluginPipelineGets atomic.Int64 + pluginPipelinePuts atomic.Int64 + pluginPipelineCreations atomic.Int64 } // PluginPipeline encapsulates the execution of plugin PreHooks and PostHooks, tracks how many plugins ran, and manages short-circuiting and error aggregation. @@ -101,6 +127,8 @@ var retryableStatusCodes = map[int]bool{ // Returns an error if initialization fails. // Initial Memory Allocations happens here as per the initial pool size. func Init(config schemas.BifrostConfig) (*Bifrost, error) { + debug.SetGCPercent(-1) + if config.Account == nil { return nil, fmt.Errorf("account is required to initialize Bifrost") } @@ -117,26 +145,31 @@ func Init(config schemas.BifrostConfig) (*Bifrost, error) { // Initialize object pools bifrost.channelMessagePool = sync.Pool{ New: func() interface{} { + bifrost.channelMessageCreations.Add(1) return &ChannelMessage{} }, } bifrost.responseChannelPool = sync.Pool{ New: func() interface{} { + bifrost.responseChannelCreations.Add(1) return make(chan *schemas.BifrostResponse, 1) }, } bifrost.errorChannelPool = sync.Pool{ New: func() interface{} { + bifrost.errorChannelCreations.Add(1) return make(chan schemas.BifrostError, 1) }, } bifrost.responseStreamPool = sync.Pool{ New: func() interface{} { + bifrost.responseStreamCreations.Add(1) return make(chan chan *schemas.BifrostStream, 1) }, } bifrost.pluginPipelinePool = sync.Pool{ New: func() interface{} { + bifrost.pluginPipelineCreations.Add(1) return &PluginPipeline{ preHookErrors: make([]error, 0), postHookErrors: make([]error, 0), @@ -431,6 +464,9 @@ transferComplete: return fmt.Errorf("failed to create provider instance for %s: %v", providerKey, err) } + // Store updated provider instance for metrics access + bifrost.providerInstances.Store(providerKey, provider) + // Step 8: Start new workers with updated concurrency bifrost.logger.Debug(fmt.Sprintf("Starting %d new workers for provider %s with buffer size %d", providerConfig.ConcurrencyAndBufferSize.Concurrency, @@ -719,6 +755,9 @@ func (bifrost *Bifrost) prepareProvider(providerKey schemas.ModelProvider, confi return fmt.Errorf("failed to create provider for the given key: %v", err) } + // Store provider instance for metrics access + bifrost.providerInstances.Store(providerKey, provider) + for range providerConfig.ConcurrencyAndBufferSize.Concurrency { waitGroupValue, _ := bifrost.waitGroups.Load(providerKey) waitGroup := waitGroupValue.(*sync.WaitGroup) @@ -796,6 +835,9 @@ func (bifrost *Bifrost) getProviderQueue(providerKey schemas.ModelProvider) (cha return queue, nil } + // ADD THIS DEBUG LINE + bifrost.logger.Warn(fmt.Sprintf("UNEXPECTED: Provider %s queue is nil after Init()!", providerKey)) + bifrost.logger.Debug(fmt.Sprintf("Creating new request queue for provider %s at runtime", providerKey)) config, err := bifrost.account.GetConfigForProvider(providerKey) @@ -970,10 +1012,15 @@ func (bifrost *Bifrost) handleStreamRequest(ctx context.Context, req *schemas.Bi // tryRequest is a generic function that handles common request processing logic // It consolidates queue setup, plugin pipeline execution, enqueue logic, and response handling func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Context, requestType RequestType) (*schemas.BifrostResponse, *schemas.BifrostError) { + startTime := time.Now() + + // Track queue acquisition time + queueStart := time.Now() queue, err := bifrost.getProviderQueue(req.Provider) if err != nil { return nil, newBifrostError(err) } + queueTime := time.Since(queueStart) // Handle nil context early to prevent blocking if ctx == nil { @@ -988,7 +1035,11 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont pipeline := bifrost.getPluginPipeline() defer bifrost.releasePluginPipeline(pipeline) + // Track plugin pre-hook time + pluginPreStart := time.Now() preReq, shortCircuit, preCount := pipeline.RunPreHooks(&ctx, req) + pluginPreTime := time.Since(pluginPreStart) + if shortCircuit != nil { // Handle short-circuit with response (success case) if shortCircuit.Response != nil { @@ -1039,16 +1090,32 @@ func (bifrost *Bifrost) tryRequest(req *schemas.BifrostRequest, ctx context.Cont var resp *schemas.BifrostResponse select { case result = <-msg.Response: + pluginPostStart := time.Now() resp, bifrostErr := pipeline.RunPostHooks(&ctx, result, nil, len(bifrost.plugins)) if bifrostErr != nil { bifrost.releaseChannelMessage(msg) return nil, bifrostErr } + + pluginPostTime := time.Since(pluginPostStart) + totalTime := time.Since(startTime) + bifrost.recordMetrics(queueTime, 0, 0, 0, pluginPreTime, pluginPostTime, totalTime, true) + + // Add bifrost metrics to the response + if rawResponse, ok := resp.ExtraFields.RawResponse.(map[string]interface{}); ok { + rawResponse["bifrost_timings"] = bifrost.GetMetrics() + resp.ExtraFields.RawResponse = rawResponse + } + bifrost.releaseChannelMessage(msg) return resp, nil case bifrostErrVal := <-msg.Err: bifrostErrPtr := &bifrostErrVal resp, bifrostErrPtr = pipeline.RunPostHooks(&ctx, nil, bifrostErrPtr, len(bifrost.plugins)) + + totalTime := time.Since(startTime) + bifrost.recordMetrics(queueTime, 0, 0, 0, pluginPreTime, 0, totalTime, false) + bifrost.releaseChannelMessage(msg) if bifrostErrPtr != nil { return nil, bifrostErrPtr @@ -1146,15 +1213,23 @@ func (bifrost *Bifrost) requestWorker(provider schemas.Provider, queue chan Chan }() for req := range queue { + startTime := time.Now() + queueWaitTime := startTime.Sub(req.Timestamp) + var result *schemas.BifrostResponse var stream chan *schemas.BifrostStream var bifrostError *schemas.BifrostError var err error + keySelectStart := time.Now() + var keySelectTime time.Duration + key := schemas.Key{} if providerRequiresKey(provider.GetProviderKey()) { key, err = bifrost.selectKeyFromProviderForModel(&req.Context, provider.GetProviderKey(), req.Model) + keySelectTime = time.Since(keySelectStart) if err != nil { + bifrost.recordError(0, queueWaitTime, keySelectTime, 0, 0, 0) bifrost.logger.Warn(fmt.Sprintf("Error selecting key for model %s: %v", req.Model, err)) req.Err <- schemas.BifrostError{ IsBifrostError: false, @@ -1180,8 +1255,12 @@ func (bifrost *Bifrost) requestWorker(provider schemas.Provider, queue chan Chan continue } - // Track attempts + // Track attempts and retry overhead var attempts int + var totalRetryWaitTime time.Duration + + // Track provider processing time + providerStart := time.Now() // Create plugin pipeline for streaming requests outside retry loop to prevent leaks var postHookRunner schemas.PostHookRunner @@ -1209,8 +1288,10 @@ func (bifrost *Bifrost) requestWorker(provider schemas.Provider, queue chan Chan )) // Calculate and apply backoff + retryWaitStart := time.Now() backoff := calculateBackoff(attempts-1, config) time.Sleep(backoff) + totalRetryWaitTime += time.Since(retryWaitStart) } bifrost.logger.Debug(fmt.Sprintf("Attempting request for provider %s", provider.GetProviderKey())) @@ -1239,6 +1320,10 @@ func (bifrost *Bifrost) requestWorker(provider schemas.Provider, queue chan Chan } } + providerTime := time.Since(providerStart) + totalTime := time.Since(startTime) + bifrost.recordMetrics(0, queueWaitTime, keySelectTime, providerTime, 0, 0, totalTime, bifrostError == nil) + if bifrostError != nil { // Add retry information to error if attempts > 0 { @@ -1394,6 +1479,7 @@ func (p *PluginPipeline) resetPluginPipeline() { // getPluginPipeline gets a PluginPipeline from the pool and configures it func (bifrost *Bifrost) getPluginPipeline() *PluginPipeline { + bifrost.pluginPipelineGets.Add(1) pipeline := bifrost.pluginPipelinePool.Get().(*PluginPipeline) pipeline.plugins = bifrost.plugins pipeline.logger = bifrost.logger @@ -1404,6 +1490,7 @@ func (bifrost *Bifrost) getPluginPipeline() *PluginPipeline { // releasePluginPipeline returns a PluginPipeline to the pool func (bifrost *Bifrost) releasePluginPipeline(pipeline *PluginPipeline) { pipeline.resetPluginPipeline() + bifrost.pluginPipelinePuts.Add(1) bifrost.pluginPipelinePool.Put(pipeline) } @@ -1413,7 +1500,9 @@ func (bifrost *Bifrost) releasePluginPipeline(pipeline *PluginPipeline) { // It also gets response and error channels from their respective pools. func (bifrost *Bifrost) getChannelMessage(req schemas.BifrostRequest, reqType RequestType) *ChannelMessage { // Get channels from pool + bifrost.responseChannelGets.Add(1) responseChan := bifrost.responseChannelPool.Get().(chan *schemas.BifrostResponse) + bifrost.errorChannelGets.Add(1) errorChan := bifrost.errorChannelPool.Get().(chan schemas.BifrostError) // Clear any previous values to avoid leaking between requests @@ -1427,14 +1516,17 @@ func (bifrost *Bifrost) getChannelMessage(req schemas.BifrostRequest, reqType Re } // Get message from pool and configure it + bifrost.channelMessageGets.Add(1) msg := bifrost.channelMessagePool.Get().(*ChannelMessage) msg.BifrostRequest = req msg.Response = responseChan msg.Err = errorChan msg.Type = reqType + msg.Timestamp = time.Now() // Conditionally allocate ResponseStream for streaming requests only if isStreamRequestType(reqType) { + bifrost.responseStreamGets.Add(1) responseStreamChan := bifrost.responseStreamPool.Get().(chan chan *schemas.BifrostStream) // Clear any previous values to avoid leaking between requests select { @@ -1450,7 +1542,9 @@ func (bifrost *Bifrost) getChannelMessage(req schemas.BifrostRequest, reqType Re // releaseChannelMessage returns a ChannelMessage and its channels to their respective pools. func (bifrost *Bifrost) releaseChannelMessage(msg *ChannelMessage) { // Put channels back in pools + bifrost.responseChannelPuts.Add(1) bifrost.responseChannelPool.Put(msg.Response) + bifrost.errorChannelPuts.Add(1) bifrost.errorChannelPool.Put(msg.Err) // Return ResponseStream to pool if it was used @@ -1460,6 +1554,7 @@ func (bifrost *Bifrost) releaseChannelMessage(msg *ChannelMessage) { case <-msg.ResponseStream: default: } + bifrost.responseStreamPuts.Add(1) bifrost.responseStreamPool.Put(msg.ResponseStream) } @@ -1467,6 +1562,7 @@ func (bifrost *Bifrost) releaseChannelMessage(msg *ChannelMessage) { msg.Response = nil msg.ResponseStream = nil msg.Err = nil + bifrost.channelMessagePuts.Add(1) bifrost.channelMessagePool.Put(msg) } @@ -1569,5 +1665,207 @@ func (bifrost *Bifrost) Cleanup() { } } + // Print final statistics as JSON + allStats := bifrost.GetAllStats() + if statsJSON, err := json.MarshalIndent(allStats, "", " "); err == nil { + bifrost.logger.Info(fmt.Sprintf("Final Statistics:\n%s", string(statsJSON))) + } else { + bifrost.logger.Warn(fmt.Sprintf("Failed to marshal statistics to JSON: %v", err)) + } + bifrost.logger.Info("Graceful Cleanup Completed") } + +// PERFORMANCE TRACKING + +// Metrics to track timing +type RequestMetrics struct { + TotalTime time.Duration `json:"total_time"` + QueueAcquisitionTime time.Duration `json:"queue_acquisition_time"` + QueueWaitTime time.Duration `json:"queue_wait_time"` + KeySelectionTime time.Duration `json:"key_selection_time"` + ProviderTime time.Duration `json:"provider_time"` + PluginPreTime time.Duration `json:"plugin_pre_time"` + PluginPostTime time.Duration `json:"plugin_post_time"` + RequestCount int64 `json:"request_count"` + ErrorCount int64 `json:"error_count"` +} + +// AtomicMetrics stores metrics using atomic operations for high-throughput scenarios +type AtomicMetrics struct { + // Counters + RequestCount int64 + ErrorCount int64 + + // Time accumulators (in nanoseconds for atomic operations) + TotalTimeNs int64 + QueueAcquisitionTimeNs int64 + QueueWaitTimeNs int64 + KeySelectionTimeNs int64 + ProviderTimeNs int64 + PluginPreTimeNs int64 + PluginPostTimeNs int64 +} + +func (bifrost *Bifrost) recordError(queueAcquisitionTime, queueWaitTime, keySelectTime, providerTime, pluginPreTime, pluginPostTime time.Duration) { + // Spawn goroutine to avoid blocking the request path + go func() { + atomic.AddInt64(&bifrost.metrics.RequestCount, 1) + atomic.AddInt64(&bifrost.metrics.ErrorCount, 1) + + totalTime := queueAcquisitionTime + queueWaitTime + keySelectTime + providerTime + pluginPreTime + pluginPostTime + + // Add to accumulators + atomic.AddInt64(&bifrost.metrics.TotalTimeNs, totalTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.QueueAcquisitionTimeNs, queueAcquisitionTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.QueueWaitTimeNs, queueWaitTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.KeySelectionTimeNs, keySelectTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.ProviderTimeNs, providerTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.PluginPreTimeNs, pluginPreTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.PluginPostTimeNs, pluginPostTime.Nanoseconds()) + }() +} + +func (bifrost *Bifrost) recordMetrics(queueAcquisitionTime, queueWaitTime, keySelectTime, providerTime, pluginPreTime, pluginPostTime, totalTime time.Duration, success bool) { + // Spawn goroutine to avoid blocking the request path + go func() { + atomic.AddInt64(&bifrost.metrics.RequestCount, 1) + if !success { + atomic.AddInt64(&bifrost.metrics.ErrorCount, 1) + } + + // fmt.Println("Queue acquisition:", queueAcquisitionTime, "Queue wait:", queueWaitTime) + + // Add to accumulators + atomic.AddInt64(&bifrost.metrics.TotalTimeNs, totalTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.QueueAcquisitionTimeNs, queueAcquisitionTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.QueueWaitTimeNs, queueWaitTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.KeySelectionTimeNs, keySelectTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.ProviderTimeNs, providerTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.PluginPreTimeNs, pluginPreTime.Nanoseconds()) + atomic.AddInt64(&bifrost.metrics.PluginPostTimeNs, pluginPostTime.Nanoseconds()) + }() +} + +func (bifrost *Bifrost) GetMetrics() RequestMetrics { + // Read atomic values and calculate averages + requestCount := atomic.LoadInt64(&bifrost.metrics.RequestCount) + if requestCount == 0 { + return RequestMetrics{} + } + + return RequestMetrics{ + RequestCount: requestCount, + ErrorCount: atomic.LoadInt64(&bifrost.metrics.ErrorCount), + TotalTime: time.Duration(atomic.LoadInt64(&bifrost.metrics.TotalTimeNs) / requestCount), + QueueAcquisitionTime: time.Duration(atomic.LoadInt64(&bifrost.metrics.QueueAcquisitionTimeNs) / requestCount), + QueueWaitTime: time.Duration(atomic.LoadInt64(&bifrost.metrics.QueueWaitTimeNs) / requestCount), + KeySelectionTime: time.Duration(atomic.LoadInt64(&bifrost.metrics.KeySelectionTimeNs) / requestCount), + ProviderTime: time.Duration(atomic.LoadInt64(&bifrost.metrics.ProviderTimeNs) / requestCount), + PluginPreTime: time.Duration(atomic.LoadInt64(&bifrost.metrics.PluginPreTimeNs) / requestCount), + PluginPostTime: time.Duration(atomic.LoadInt64(&bifrost.metrics.PluginPostTimeNs) / requestCount), + } +} + +// GetAllStats returns all statistics including request metrics and pool usage +func (bifrost *Bifrost) GetAllStats() map[string]interface{} { + stats := make(map[string]interface{}) + // Add request metrics + metrics := bifrost.GetMetrics() + stats["request_metrics"] = map[string]interface{}{ + "total_time": metrics.TotalTime.String(), + "queue_acquisition_time": metrics.QueueAcquisitionTime.String(), + "queue_wait_time": metrics.QueueWaitTime.String(), + "key_selection_time": metrics.KeySelectionTime.String(), + "provider_time": metrics.ProviderTime.String(), + "plugin_pre_time": metrics.PluginPreTime.String(), + "plugin_post_time": metrics.PluginPostTime.String(), + "request_count": metrics.RequestCount, + "error_count": metrics.ErrorCount, + "error_rate": fmt.Sprintf("%.2f%%", float64(metrics.ErrorCount)/float64(metrics.RequestCount)*100), + } + + // Add current runtime memory statistics + var currentMem runtime.MemStats + runtime.ReadMemStats(¤tMem) + + // Helper function to convert bytes to MB + bytesToMB := func(bytes uint64) string { + return fmt.Sprintf("%.2f MB", float64(bytes)/(1024*1024)) + } + + // Helper function to convert nanoseconds to microseconds + nsToMicroseconds := func(ns uint64) string { + return fmt.Sprintf("%.2f μs", float64(ns)/1000) + } + + stats["runtime_memory"] = map[string]interface{}{ + "current_alloc": bytesToMB(currentMem.Alloc), + "total_alloc": bytesToMB(currentMem.TotalAlloc), + "sys": bytesToMB(currentMem.Sys), + "heap_alloc": bytesToMB(currentMem.HeapAlloc), + "heap_sys": bytesToMB(currentMem.HeapSys), + "stack_inuse": bytesToMB(currentMem.StackInuse), + "stack_sys": bytesToMB(currentMem.StackSys), + "num_gc": currentMem.NumGC, + "gc_cpu_fraction": fmt.Sprintf("%.4f", currentMem.GCCPUFraction), + "pause_total": nsToMicroseconds(currentMem.PauseTotalNs), + "num_forced_gc": currentMem.NumForcedGC, + } + + // Add pool usage statistics + stats["pool_stats"] = bifrost.GetPoolStats() + + // Add provider-specific metrics + stats["provider_metrics"] = make(map[string]interface{}) + + // Check if OpenAI provider exists and add its metrics + if providerInstance, exists := bifrost.providerInstances.Load(schemas.OpenAI); exists { + if openAIProvider, ok := providerInstance.(*providers.OpenAIProvider); ok { + stats["provider_metrics"].(map[string]interface{})["openai"] = openAIProvider.GetOpenAIMetrics() + } + } + + return stats +} + +// GetPoolStats returns statistics about object pool usage +func (bifrost *Bifrost) GetPoolStats() map[string]interface{} { + stats := make(map[string]interface{}) + // Add channel message pool stats + stats["channel_message_pool"] = map[string]int64{ + "gets": bifrost.channelMessageGets.Load(), + "puts": bifrost.channelMessagePuts.Load(), + "creations": bifrost.channelMessageCreations.Load(), + } + // Add response channel pool stats + stats["response_channel_pool"] = map[string]int64{ + "gets": bifrost.responseChannelGets.Load(), + "puts": bifrost.responseChannelPuts.Load(), + "creations": bifrost.responseChannelCreations.Load(), + } + // Add response stream pool stats + stats["response_stream_pool"] = map[string]int64{ + "gets": bifrost.responseStreamGets.Load(), + "puts": bifrost.responseStreamPuts.Load(), + "creations": bifrost.responseStreamCreations.Load(), + } + // Add error channel pool stats + stats["error_channel_pool"] = map[string]int64{ + "gets": bifrost.errorChannelGets.Load(), + "puts": bifrost.errorChannelPuts.Load(), + "creations": bifrost.errorChannelCreations.Load(), + } + // Add plugin pipeline pool stats + stats["plugin_pipeline_pool"] = map[string]int64{ + "gets": bifrost.pluginPipelineGets.Load(), + "puts": bifrost.pluginPipelinePuts.Load(), + "creations": bifrost.pluginPipelineCreations.Load(), + } + // Add provider-specific pool stats + providerStats := providers.GetPoolStats() + for k, v := range providerStats { + stats[k] = v + } + return stats +} diff --git a/core/providers/openai.go b/core/providers/openai.go index dd5cef215d..1818dbb8aa 100644 --- a/core/providers/openai.go +++ b/core/providers/openai.go @@ -8,13 +8,16 @@ import ( "context" "encoding/base64" "encoding/binary" + "encoding/json" "fmt" "io" "math" + "math/rand/v2" "mime/multipart" "net/http" "strings" "sync" + "sync/atomic" "time" "github.com/bytedance/sonic" @@ -43,12 +46,14 @@ type OpenAIResponse struct { // openAIResponsePool provides a pool for OpenAI response objects. var openAIResponsePool = sync.Pool{ New: func() interface{} { + openAIPoolGets.Add(1) return &schemas.BifrostResponse{} }, } // acquireOpenAIResponse gets an OpenAI response from the pool and resets it. func acquireOpenAIResponse() *schemas.BifrostResponse { + openAIPoolGets.Add(1) resp := openAIResponsePool.Get().(*schemas.BifrostResponse) *resp = schemas.BifrostResponse{} // Reset the struct return resp @@ -57,6 +62,7 @@ func acquireOpenAIResponse() *schemas.BifrostResponse { // releaseOpenAIResponse returns an OpenAI response to the pool. func releaseOpenAIResponse(resp *schemas.BifrostResponse) { if resp != nil { + openAIPoolPuts.Add(1) openAIResponsePool.Put(resp) } } @@ -67,6 +73,7 @@ type OpenAIProvider struct { client *fasthttp.Client // HTTP client for API requests streamClient *http.Client // HTTP client for streaming requests networkConfig schemas.NetworkConfig // Network configuration including extra headers + metrics OpenAIMetrics // Performance metrics for this provider sendBackRawResponse bool // Whether to include raw response in BifrostResponse } @@ -106,6 +113,7 @@ func NewOpenAIProvider(config *schemas.ProviderConfig, logger schemas.Logger) *O client: client, streamClient: streamClient, networkConfig: config.NetworkConfig, + metrics: OpenAIMetrics{}, // Initialize metrics sendBackRawResponse: config.SendBackRawResponse, } } @@ -125,6 +133,8 @@ func (provider *OpenAIProvider) TextCompletion(ctx context.Context, model string // It supports both text and image content in messages. // Returns a BifrostResponse containing the completion results or an error if the request fails. func (provider *OpenAIProvider) ChatCompletion(ctx context.Context, model string, key schemas.Key, messages []schemas.BifrostMessage, params *schemas.ModelParameters) (*schemas.BifrostResponse, *schemas.BifrostError) { + timings := make(map[string]time.Duration) + formattedMessages, preparedParams := prepareOpenAIChatRequest(messages, params) requestBody := mergeConfig(map[string]interface{}{ @@ -132,10 +142,18 @@ func (provider *OpenAIProvider) ChatCompletion(ctx context.Context, model string "messages": formattedMessages, }, preparedParams) + // Track JSON marshaling time + marshalStart := time.Now() jsonBody, err := sonic.Marshal(requestBody) if err != nil { + timings["json_marshaling"] = time.Since(marshalStart) + provider.recordOpenAIMetrics(timings, false) // Record metrics for failed request return nil, newBifrostOperationError(schemas.ErrProviderJSONMarshaling, err, schemas.OpenAI) } + timings["json_marshaling"] = time.Since(marshalStart) + + // Track request setup time + setupStart := time.Now() // Create request req := fasthttp.AcquireRequest() @@ -153,27 +171,57 @@ func (provider *OpenAIProvider) ChatCompletion(ctx context.Context, model string req.SetBody(jsonBody) + timings["request_setup"] = time.Since(setupStart) + + // Track HTTP request time + httpStart := time.Now() + + mockResponse := mockOpenAIChatCompletionResponse(req, model) + // Copy the mock response body to the real response + resp.SetBody(mockResponse) + // Simulate network delay + jitter := time.Duration(float64(1500*time.Millisecond) * (0.6 + 0.8*rand.Float64())) + time.Sleep(jitter) + // Make request - bifrostErr := makeRequestWithContext(ctx, provider.client, req, resp) - if bifrostErr != nil { - return nil, bifrostErr - } + // bifrostErr := makeRequestWithContext(ctx, provider.client, req, resp) + timings["http_request"] = time.Since(httpStart) + // if bifrostErr != nil { + // provider.recordOpenAIMetrics(timings, false) // Record metrics for failed request + // return nil, bifrostErr + // } + + // Track error handling time + errorStart := time.Now() // Handle error response if resp.StatusCode() != fasthttp.StatusOK { + timings["error_handling"] = time.Since(errorStart) + provider.recordOpenAIMetrics(timings, false) // Record metrics for failed request provider.logger.Debug(fmt.Sprintf("error from openai provider: %s", string(resp.Body()))) return nil, parseOpenAIError(resp) } + timings["error_handling"] = time.Since(errorStart) + + // Track response parsing time (more granular) + parseStart := time.Now() + responseBody := resp.Body() - // Pre-allocate response structs from pools + // Track pool acquisition time + poolStart := time.Now() response := acquireOpenAIResponse() + timings["pool_acquisition"] = time.Since(poolStart) defer releaseOpenAIResponse(response) // Use enhanced response handler with pre-allocated response + unmarshalStartTime := time.Now() rawResponse, bifrostErr := handleProviderResponse(responseBody, response, provider.sendBackRawResponse) + timings["json_unmarshaling"] = time.Since(unmarshalStartTime) if bifrostErr != nil { + timings["total_response_parsing"] = time.Since(parseStart) + provider.recordOpenAIMetrics(timings, false) // Record metrics for failed request return nil, bifrostErr } @@ -186,6 +234,13 @@ func (provider *OpenAIProvider) ChatCompletion(ctx context.Context, model string response.ExtraFields.Params = *params } + timings["total_response_parsing"] = time.Since(parseStart) + + response.ExtraFields.RawResponse = map[string]interface{}{ + "timings": timings, + } + + provider.recordOpenAIMetrics(timings, true) // Record metrics for successful request return response, nil } @@ -1148,6 +1203,119 @@ func parseStreamOpenAIError(resp *http.Response) *schemas.BifrostError { return bifrostErr } +// PERFORMANCE TRACKING + +// OpenAIMetrics stores provider-specific metrics using atomic operations for high-throughput scenarios +type OpenAIMetrics struct { + // Counters + RequestCount int64 + ErrorCount int64 + + // Time accumulators (in nanoseconds for atomic operations) + TotalTimeNs int64 + MessagePreparationTimeNs int64 + RequestBodyPreparationTimeNs int64 + JSONMarshalingTimeNs int64 + RequestSetupTimeNs int64 + HTTPRequestTimeNs int64 + ErrorHandlingTimeNs int64 + PoolAcquisitionTimeNs int64 + JSONUnmarshalingTimeNs int64 + ResponseParsingTimeNs int64 +} + +// Counters for pool usage +var ( + openAIPoolGets atomic.Int64 + openAIPoolPuts atomic.Int64 + openAIPoolCreations atomic.Int64 +) + +// GetPoolStats returns the current pool usage statistics +func GetPoolStats() map[string]int64 { + return map[string]int64{ + "openai_pool_gets": openAIPoolGets.Load(), + "openai_pool_puts": openAIPoolPuts.Load(), + "openai_pool_creations": openAIPoolCreations.Load(), + } +} + +// recordOpenAIMetrics records provider-specific metrics atomically in a goroutine +func (provider *OpenAIProvider) recordOpenAIMetrics(timings map[string]time.Duration, success bool) { + // Spawn goroutine to avoid blocking the request path + go func() { + atomic.AddInt64(&provider.metrics.RequestCount, 1) + if !success { + atomic.AddInt64(&provider.metrics.ErrorCount, 1) + } + + // Calculate total time from all timings + var totalTime time.Duration + for _, duration := range timings { + totalTime += duration + } + + // Add to accumulators + atomic.AddInt64(&provider.metrics.TotalTimeNs, totalTime.Nanoseconds()) + + if msgPrepTime, exists := timings["total_message_preparation"]; exists { + atomic.AddInt64(&provider.metrics.MessagePreparationTimeNs, msgPrepTime.Nanoseconds()) + } + if bodyPrepTime, exists := timings["request_body_preparation"]; exists { + atomic.AddInt64(&provider.metrics.RequestBodyPreparationTimeNs, bodyPrepTime.Nanoseconds()) + } + if marshalTime, exists := timings["json_marshaling"]; exists { + atomic.AddInt64(&provider.metrics.JSONMarshalingTimeNs, marshalTime.Nanoseconds()) + } + if setupTime, exists := timings["request_setup"]; exists { + atomic.AddInt64(&provider.metrics.RequestSetupTimeNs, setupTime.Nanoseconds()) + } + if httpTime, exists := timings["http_request"]; exists { + atomic.AddInt64(&provider.metrics.HTTPRequestTimeNs, httpTime.Nanoseconds()) + } + if errorTime, exists := timings["error_handling"]; exists { + atomic.AddInt64(&provider.metrics.ErrorHandlingTimeNs, errorTime.Nanoseconds()) + } + if poolTime, exists := timings["pool_acquisition"]; exists { + atomic.AddInt64(&provider.metrics.PoolAcquisitionTimeNs, poolTime.Nanoseconds()) + } + if unmarshalTime, exists := timings["json_unmarshaling"]; exists { + atomic.AddInt64(&provider.metrics.JSONUnmarshalingTimeNs, unmarshalTime.Nanoseconds()) + } + if parseTime, exists := timings["total_response_parsing"]; exists { + atomic.AddInt64(&provider.metrics.ResponseParsingTimeNs, parseTime.Nanoseconds()) + } + }() +} + +// GetOpenAIMetrics returns averaged provider metrics +func (provider *OpenAIProvider) GetOpenAIMetrics() map[string]interface{} { + // Read atomic values and calculate averages + requestCount := atomic.LoadInt64(&provider.metrics.RequestCount) + if requestCount == 0 { + return map[string]interface{}{ + "request_count": 0, + "error_count": 0, + } + } + + return map[string]interface{}{ + "request_count": requestCount, + "error_count": atomic.LoadInt64(&provider.metrics.ErrorCount), + "error_rate": fmt.Sprintf("%.2f%%", float64(atomic.LoadInt64(&provider.metrics.ErrorCount))/float64(requestCount)*100), + "avg_total_time": time.Duration(atomic.LoadInt64(&provider.metrics.TotalTimeNs) / requestCount).String(), + "avg_message_preparation_time": time.Duration(atomic.LoadInt64(&provider.metrics.MessagePreparationTimeNs) / requestCount).String(), + "avg_request_body_prep_time": time.Duration(atomic.LoadInt64(&provider.metrics.RequestBodyPreparationTimeNs) / requestCount).String(), + "avg_json_marshaling_time": time.Duration(atomic.LoadInt64(&provider.metrics.JSONMarshalingTimeNs) / requestCount).String(), + "avg_request_setup_time": time.Duration(atomic.LoadInt64(&provider.metrics.RequestSetupTimeNs) / requestCount).String(), + "avg_http_request_time": time.Duration(atomic.LoadInt64(&provider.metrics.HTTPRequestTimeNs) / requestCount).String(), + "avg_error_handling_time": time.Duration(atomic.LoadInt64(&provider.metrics.ErrorHandlingTimeNs) / requestCount).String(), + "avg_pool_acquisition_time": time.Duration(atomic.LoadInt64(&provider.metrics.PoolAcquisitionTimeNs) / requestCount).String(), + "avg_json_unmarshaling_time": time.Duration(atomic.LoadInt64(&provider.metrics.JSONUnmarshalingTimeNs) / requestCount).String(), + "avg_response_parsing_time": time.Duration(atomic.LoadInt64(&provider.metrics.ResponseParsingTimeNs) / requestCount).String(), + } +} + func parseOpenAIErrorForStreamDataLine(jsonData string) (*schemas.BifrostStream, error) { var openAIError schemas.BifrostError if err := sonic.Unmarshal([]byte(jsonData), &openAIError); err != nil { @@ -1176,3 +1344,147 @@ func parseOpenAIErrorForStreamDataLine(jsonData string) (*schemas.BifrostStream, return errorStream, nil } + +// mockOpenAIResponse creates a mock response for OpenAI API calls +func mockOpenAIChatCompletionResponse(req *fasthttp.Request, model string) []byte { + // Create a mock response that mimics OpenAI's format + mockResp := &OpenAIResponse{ + ID: "mock-" + model + "-" + fmt.Sprintf("%d", time.Now().Unix()), + Object: "chat.completion", + Model: model, + Created: int(time.Now().Unix()), + Choices: []schemas.BifrostResponseChoice{ + { + Index: 0, + BifrostNonStreamResponseChoice: &schemas.BifrostNonStreamResponseChoice{ + Message: schemas.BifrostMessage{ + Role: schemas.ModelChatMessageRoleAssistant, + Content: schemas.MessageContent{ + ContentStr: StrPtr("This is a mock response from the Bifrost API gateway. The actual API was not called. " + + "This response has been expanded to demonstrate the system's ability to handle larger payloads. " + + "In a real-world scenario, this could be a comprehensive analysis, detailed explanation, or extensive documentation. " + + "The Bifrost API gateway serves as a unified interface for multiple AI providers, offering seamless integration " + + "and consistent response formats across different language models and services. " + + "When operating in mock mode, the system generates realistic responses that mirror the expected output " + + "from actual AI providers while maintaining the same structure and formatting conventions. " + + "This capability is particularly useful for testing, development, and demonstration purposes " + + "where you need predictable responses without consuming actual API credits or making real network calls. " + + "The mock responses can be configured to simulate various scenarios including success cases, " + + "error conditions, streaming responses, and different content types such as text, code, and structured data. " + + "Additionally, the system supports comprehensive logging and monitoring of all interactions, " + + "providing detailed insights into request patterns, response times, token usage, and system performance metrics. " + + "This mock response continues with additional content to reach the desired payload size of 10KB. " + + "Performance optimization is a key consideration in the design of this system, ensuring that " + + "large responses can be handled efficiently without compromising system stability or user experience. " + + "The architecture supports horizontal scaling, load balancing, and fault tolerance mechanisms " + + "to maintain high availability and reliability even under heavy load conditions. " + + "Security features include authentication, authorization, rate limiting, and comprehensive audit logging " + + "to ensure that all API interactions are properly tracked and controlled. " + + "The system also provides extensive configuration options allowing administrators to customize " + + "behavior based on specific requirements and use cases. " + + "Documentation and examples are provided to help developers integrate with the API quickly and effectively. " + + "The mock mode serves as an excellent starting point for understanding the API structure and response formats " + + "before moving to production deployments with actual AI provider integrations. " + + "This extended mock response demonstrates the system's capability to handle substantial content volumes " + + "while maintaining proper JSON formatting and response structure compliance. " + + "The implementation includes proper error handling, timeout management, and resource cleanup " + + "to ensure robust operation in production environments. " + + "Monitoring and alerting capabilities provide real-time visibility into system health and performance, " + + "enabling proactive identification and resolution of potential issues. " + + "The API supports various content types including plain text, markdown, HTML, JSON, XML, and binary data, " + + "making it suitable for diverse application requirements and integration scenarios. " + + "Advanced features include request transformation, response filtering, content validation, " + + "and custom middleware support for implementing specialized business logic. " + + "The system is designed to be provider-agnostic, allowing seamless switching between different AI services " + + "without requiring changes to client applications or integration code. " + + "This flexibility enables organizations to optimize costs, performance, and capabilities " + + "by leveraging the best features of multiple AI providers through a single, unified interface. " + + "Comprehensive testing suites ensure reliability and compatibility across all supported providers and features. " + + "The mock response system includes sophisticated simulation capabilities that can replicate " + + "real-world usage patterns and edge cases to support thorough testing and validation processes. " + + "Performance benchmarking tools are integrated to measure and optimize system throughput, latency, " + + "and resource utilization under various load conditions and configuration settings. " + + "The architecture supports both synchronous and asynchronous processing models, " + + "enabling efficient handling of both real-time interactions and batch processing scenarios. " + + "Data persistence and caching mechanisms are implemented to improve response times " + + "and reduce external API calls when appropriate, while maintaining data freshness and accuracy. " + + "The system includes comprehensive logging and analytics capabilities that provide insights " + + "into usage patterns, performance trends, and potential optimization opportunities. " + + "This mock response continues to provide additional content to demonstrate the system's ability " + + "to handle large payloads efficiently and effectively while maintaining proper formatting and structure. " + + "The implementation follows industry best practices for API design, security, and performance, " + + "ensuring compatibility with existing development workflows and deployment processes. " + + "Support for multiple programming languages and frameworks makes integration straightforward " + + "regardless of the technology stack used by client applications. " + + "The system provides detailed documentation, code examples, and interactive tutorials " + + "to help developers get started quickly and implement advanced features effectively. " + + "This comprehensive mock response serves as an example of the type of detailed, " + + "informative content that can be processed and delivered through the Bifrost API gateway, " + + "demonstrating its capability to handle substantial payloads while maintaining high performance " + + "The architecture supports both synchronous and asynchronous processing models, " + + "enabling efficient handling of both real-time interactions and batch processing scenarios. " + + "Data persistence and caching mechanisms are implemented to improve response times " + + "and reduce external API calls when appropriate, while maintaining data freshness and accuracy. " + + "The system includes comprehensive logging and analytics capabilities that provide insights " + + "into usage patterns, performance trends, and potential optimization opportunities. " + + "This mock response continues to provide additional content to demonstrate the system's ability " + + "to handle large payloads efficiently and effectively while maintaining proper formatting and structure. " + + "The implementation follows industry best practices for API design, security, and performance, " + + "ensuring compatibility with existing development workflows and deployment processes. " + + "Support for multiple programming languages and frameworks makes integration straightforward " + + "regardless of the technology stack used by client applications. " + + "The system provides detailed documentation, code examples, and interactive tutorials " + + "to help developers get started quickly and implement advanced features effectively. " + + "This comprehensive mock response serves as an example of the type of detailed, " + + "informative content that can be processed and delivered through the Bifrost API gateway, " + + "demonstrating its capability to handle substantial payloads while maintaining high performance " + + "The architecture supports both synchronous and asynchronous processing models, " + + "enabling efficient handling of both real-time interactions and batch processing scenarios. " + + "Data persistence and caching mechanisms are implemented to improve response times " + + "and reduce external API calls when appropriate, while maintaining data freshness and accuracy. " + + "The system includes comprehensive logging and analytics capabilities that provide insights " + + "into usage patterns, performance trends, and potential optimization opportunities. " + + "This mock response continues to provide additional content to demonstrate the system's ability " + + "to handle large payloads efficiently and effectively while maintaining proper formatting and structure. " + + "The implementation follows industry best practices for API design, security, and performance, " + + "ensuring compatibility with existing development workflows and deployment processes. " + + "Support for multiple programming languages and frameworks makes integration straightforward " + + "regardless of the technology stack used by client applications. " + + "The system provides detailed documentation, code examples, and interactive tutorials " + + "to help developers get started quickly and implement advanced features effectively. " + + "This comprehensive mock response serves as an example of the type of detailed, " + + "informative content that can be processed and delivered through the Bifrost API gateway, " + + "demonstrating its capability to handle substantial payloads while maintaining high performance " + + "The architecture supports both synchronous and asynchronous processing models, " + + "enabling efficient handling of both real-time interactions and batch processing scenarios. " + + "Data persistence and caching mechanisms are implemented to improve response times " + + "and reduce external API calls when appropriate, while maintaining data freshness and accuracy. " + + "The system includes comprehensive logging and analytics capabilities that provide insights " + + "into usage patterns, performance trends, and potential optimization opportunities. " + + "This mock response continues to provide additional content to demonstrate the system's ability " + + "to handle large payloads efficiently and effectively while maintaining proper formatting and structure. " + + "The implementation follows industry best practices for API design, security, and performance, " + + "ensuring compatibility with existing development workflows and deployment processes. " + + "Support for multiple programming languages and frameworks makes integration straightforward " + + "and reliability standards expected in production environments."), + }, + }, + }, + FinishReason: StrPtr("stop"), + }, + }, + Usage: schemas.LLMUsage{ + PromptTokens: 100, + CompletionTokens: 50, + TotalTokens: 150, + }, + } + + // Convert to JSON + mockJSON, err := json.Marshal(mockResp) + if err != nil { + return nil + } + + return mockJSON +} diff --git a/transports/bifrost-http/main.go b/transports/bifrost-http/main.go index a36fe4b1a4..d77446b463 100644 --- a/transports/bifrost-http/main.go +++ b/transports/bifrost-http/main.go @@ -55,9 +55,11 @@ import ( "log" "mime" "os" + "os/signal" "path" "path/filepath" "strings" + "syscall" "github.com/fasthttp/router" bifrost "github.com/maximhq/bifrost/core" @@ -426,15 +428,27 @@ func main() { // Apply CORS middleware to all routes corsHandler := corsMiddleware(r.Handler) - log.Printf("Successfully started bifrost. Serving UI on http://localhost:%s", port) - if err := fasthttp.ListenAndServe(":"+port, corsHandler); err != nil { - log.Fatalf("Error starting server: %v", err) - } + // Set up signal handling for graceful shutdown + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + // Start server in a goroutine + go func() { + log.Printf("Successfully started bifrost. Serving UI on http://localhost:%s", port) + if err := fasthttp.ListenAndServe(":"+port, corsHandler); err != nil { + log.Fatalf("Error starting server: %v", err) + } + }() + + // Wait for interrupt signal + <-c + log.Println("Received interrupt signal, initiating graceful shutdown...") - // Cleanup resources on shutdown + // Perform cleanup if wsHandler != nil { wsHandler.Stop() } client.Cleanup() + log.Println("Graceful shutdown completed") } diff --git a/transports/bifrost-http/plugins/logging/main.go b/transports/bifrost-http/plugins/logging/main.go index 6329961c86..04b6dc62f2 100644 --- a/transports/bifrost-http/plugins/logging/main.go +++ b/transports/bifrost-http/plugins/logging/main.go @@ -80,7 +80,7 @@ type InitialLogData struct { Params *schemas.ModelParameters SpeechInput *schemas.SpeechInput TranscriptionInput *schemas.TranscriptionInput - Tools *[]schemas.Tool + Tools []schemas.Tool } // SearchFilters represents the available filters for log searches @@ -331,9 +331,9 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest TranscriptionInput: req.Input.TranscriptionInput, } - if req.Params != nil && req.Params.Tools != nil { - initialData.Tools = req.Params.Tools - } + // if req.Params != nil && req.Params.Tools != nil { + // initialData.Tools = req.Params.Tools + // } // Store created timestamp in context for latency calculation optimization createdTimestamp := time.Now() @@ -363,10 +363,10 @@ func (p *LoggerPlugin) PreHook(ctx *context.Context, req *schemas.BifrostRequest Model: logMsg.InitialData.Model, InputHistoryParsed: logMsg.InitialData.InputHistory, ParamsParsed: logMsg.InitialData.Params, - ToolsParsed: logMsg.InitialData.Tools, - Status: "processing", - Stream: false, // Initially false, will be updated if streaming - CreatedAt: logMsg.Timestamp, + // ToolsParsed: logMsg.InitialData.Tools, + Status: "processing", + Stream: false, // Initially false, will be updated if streaming + CreatedAt: logMsg.Timestamp, } p.logCallback(initialEntry) } diff --git a/transports/bifrost-http/plugins/logging/operations.go b/transports/bifrost-http/plugins/logging/operations.go index f15d96789d..6601960207 100644 --- a/transports/bifrost-http/plugins/logging/operations.go +++ b/transports/bifrost-http/plugins/logging/operations.go @@ -25,9 +25,9 @@ func (p *LoggerPlugin) insertInitialLogEntry(requestID string, timestamp time.Ti Stream: false, CreatedAt: timestamp, // Set parsed fields for serialization - InputHistoryParsed: data.InputHistory, - ParamsParsed: data.Params, - ToolsParsed: data.Tools, + InputHistoryParsed: data.InputHistory, + ParamsParsed: data.Params, + // ToolsParsed: data.Tools, SpeechInputParsed: data.SpeechInput, TranscriptionInputParsed: data.TranscriptionInput, } diff --git a/transports/go.mod b/transports/go.mod index 815041707f..92bca33cf5 100644 --- a/transports/go.mod +++ b/transports/go.mod @@ -16,6 +16,8 @@ require ( gorm.io/gorm v1.30.0 ) +replace github.com/maximhq/bifrost/core => ../core + require ( cloud.google.com/go v0.121.0 // indirect cloud.google.com/go/auth v0.16.0 // indirect