Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,10 +2180,10 @@ func (bifrost *Bifrost) UpdateProvider(providerKey schemas.ModelProvider) error

// Step 1: Create new ProviderQueue with updated buffer size
newPq := &ProviderQueue{
queue: make(chan *ChannelMessage, providerConfig.ConcurrencyAndBufferSize.BufferSize),
done: make(chan struct{}),
queue: make(chan *ChannelMessage, providerConfig.ConcurrencyAndBufferSize.BufferSize),
done: make(chan struct{}),
signalOnce: sync.Once{},
closeOnce: sync.Once{},
closeOnce: sync.Once{},
}

// Step 2: Atomically replace the queue FIRST (new producers immediately get the new queue)
Expand Down Expand Up @@ -2695,10 +2695,10 @@ func (bifrost *Bifrost) createBaseProvider(providerKey schemas.ModelProvider, co
func (bifrost *Bifrost) prepareProvider(providerKey schemas.ModelProvider, config *schemas.ProviderConfig) error {
// Create ProviderQueue with lifecycle management
pq := &ProviderQueue{
queue: make(chan *ChannelMessage, config.ConcurrencyAndBufferSize.BufferSize),
done: make(chan struct{}),
queue: make(chan *ChannelMessage, config.ConcurrencyAndBufferSize.BufferSize),
done: make(chan struct{}),
signalOnce: sync.Once{},
closeOnce: sync.Once{},
closeOnce: sync.Once{},
}

bifrost.requestQueues.Store(providerKey, pq)
Expand Down
36 changes: 33 additions & 3 deletions core/mcp/health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,24 @@ func (chm *ClientHealthMonitor) Start() {
return // Already monitoring
}

// Check client exists FIRST before allocating resources
chm.manager.mu.RLock()
clientState, exists := chm.manager.clientMap[chm.clientID]
chm.manager.mu.RUnlock()

if !exists {
// Use clientID for logging when client is missing
logger.Error("%s Health monitor failed to start for client %s, client not found in manager", MCPLogPrefix, chm.clientID)
return
}

// Now allocate resources (after validation)
chm.isMonitoring = true
chm.ctx, chm.cancel = context.WithCancel(context.Background())
chm.ticker = time.NewTicker(chm.interval)

go chm.monitorLoop()
logger.Debug(fmt.Sprintf("%s Health monitor started for client %s (interval: %v)", MCPLogPrefix, chm.clientID, chm.interval))
logger.Debug("%s Health monitor started for client %s", MCPLogPrefix, clientState.ExecutionConfig.Name)
}

// Stop stops monitoring the client's health
Expand All @@ -82,14 +94,32 @@ func (chm *ClientHealthMonitor) Stop() {
return // Not monitoring
}

// Acquire read lock before reading clientMap to avoid race condition
chm.manager.mu.RLock()
clientState, exists := chm.manager.clientMap[chm.clientID]
chm.manager.mu.RUnlock()

// Determine display name for logging: use clientState.ExecutionConfig.Name if available, otherwise fall back to clientID
displayName := chm.clientID
if exists {
displayName = clientState.ExecutionConfig.Name
}

// Always perform cleanup even when client is missing
chm.isMonitoring = false
if chm.ticker != nil {
chm.ticker.Stop()
}
if chm.cancel != nil {
chm.cancel()
}
logger.Debug(fmt.Sprintf("%s Health monitor stopped for client %s", MCPLogPrefix, chm.clientID))

Comment thread
coderabbitai[bot] marked this conversation as resolved.
if !exists {
logger.Error("%s Health monitor failed to stop for client %s, client not found in manager", MCPLogPrefix, displayName)
return
}

logger.Debug("%s Health monitor stopped for client %s", MCPLogPrefix, displayName)
}

// monitorLoop runs the health check loop
Expand Down Expand Up @@ -175,7 +205,7 @@ func (chm *ClientHealthMonitor) updateClientState(state schemas.MCPConnectionSta

// Log after releasing the lock
if stateChanged {
logger.Info(fmt.Sprintf("%s Client %s connection state changed to: %s", MCPLogPrefix, chm.clientID, state))
logger.Info(fmt.Sprintf("%s Client %s connection state changed to: %s", MCPLogPrefix, clientState.ExecutionConfig.Name, state))
Comment thread
Pratham-Mishra04 marked this conversation as resolved.
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/providers/mistral/mistral.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func (provider *MistralProvider) TranscriptionStream(ctx *schemas.BifrostContext
if ctx.Err() != nil {
return
}

line := scanner.Text()

// Skip empty lines (event delimiter)
Expand Down Expand Up @@ -507,7 +507,7 @@ func (provider *MistralProvider) TranscriptionStream(ctx *schemas.BifrostContext
// Handle scanner errors
if err := scanner.Err(); err != nil {
// If context was cancelled/timed out, let defer handle it
if ctx.Err() != nil {
if ctx.Err() != nil {
return
}
ctx.SetValue(schemas.BifrostContextKeyStreamEndIndicator, true)
Expand Down
1 change: 0 additions & 1 deletion core/schemas/mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func NewMCPClientConfigFromMap(configMap map[string]any) *MCPClientConfig {
return &config
}


// HttpHeaders returns the HTTP headers for the MCP client config.
func (c *MCPClientConfig) HttpHeaders() map[string]string {
headers := make(map[string]string)
Expand Down
Loading
Loading