Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions core/bifrost.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,136 @@ func (bifrost *Bifrost) RegisterMCPTool(name, description string, handler func(a
return bifrost.mcpManager.registerTool(name, description, handler, toolSchema)
}

// IMPORTANT: Running the MCP client management operations (GetMCPClients, AddMCPClient, RemoveMCPClient, EditMCPClientTools)
// may temporarily increase latency for incoming requests while the operations are being processed.
// These operations involve network I/O and connection management that require mutex locks
// which can block briefly during execution.

// GetMCPClients returns all MCP clients managed by the Bifrost instance.
//
// Returns:
// - []schemas.MCPClient: List of all MCP clients
// - error: Any retrieval error
func (bifrost *Bifrost) GetMCPClients() ([]schemas.MCPClient, error) {
if bifrost.mcpManager == nil {
return nil, fmt.Errorf("MCP is not configured in this Bifrost instance")
}

clients, err := bifrost.mcpManager.GetClients()
if err != nil {
return nil, err
}

clientsInConfig := make([]schemas.MCPClient, 0, len(clients))
for _, client := range clients {
tools := make([]string, 0, len(client.ToolMap))
for toolName := range client.ToolMap {
tools = append(tools, toolName)
}

state := schemas.MCPConnectionStateConnected
if client.Conn == nil {
state = schemas.MCPConnectionStateDisconnected
}

clientsInConfig = append(clientsInConfig, schemas.MCPClient{
Name: client.Name,
Config: client.ExecutionConfig,
Tools: tools,
State: state,
})
}

return clientsInConfig, nil
}

// ReconnectMCPClient attempts to reconnect an MCP client if it is disconnected.
//
// Parameters:
// - name: Name of the client to reconnect
//
// Returns:
// - error: Any reconnection error
func (bifrost *Bifrost) ReconnectMCPClient(name string) error {
if bifrost.mcpManager == nil {
return fmt.Errorf("MCP is not configured in this Bifrost instance")
}

return bifrost.mcpManager.ReconnectClient(name)
}

// AddMCPClient adds a new MCP client to the Bifrost instance.
// This allows for dynamic MCP client management at runtime.
//
// Parameters:
// - config: MCP client configuration
//
// Returns:
// - error: Any registration error
//
// Example:
//
// err := bifrost.AddMCPClient(schemas.MCPClientConfig{
// Name: "my-mcp-client",
// ConnectionType: schemas.MCPConnectionTypeHTTP,
// ConnectionString: &url,
// })
func (bifrost *Bifrost) AddMCPClient(config schemas.MCPClientConfig) error {
if bifrost.mcpManager == nil {
return fmt.Errorf("MCP is not configured in this Bifrost instance")
}

return bifrost.mcpManager.AddClient(config)
}

// RemoveMCPClient removes an MCP client from the Bifrost instance.
// This allows for dynamic MCP client management at runtime.
//
// Parameters:
// - name: Name of the client to remove
//
// Returns:
// - error: Any removal error
//
// Example:
//
// err := bifrost.RemoveMCPClient("my-mcp-client")
// if err != nil {
// log.Fatalf("Failed to remove MCP client: %v", err)
// }
func (bifrost *Bifrost) RemoveMCPClient(name string) error {
if bifrost.mcpManager == nil {
return fmt.Errorf("MCP is not configured in this Bifrost instance")
}

return bifrost.mcpManager.RemoveClient(name)
}

// EditMCPClientTools edits the tools of an MCP client.
// This allows for dynamic MCP client tool management at runtime.
//
// Parameters:
// - name: Name of the client to edit
// - toolsToAdd: Tools to add to the client
// - toolsToRemove: Tools to remove from the client
//
// Returns:
// - error: Any edit error
//
// Example:
//
// err := bifrost.EditMCPClientTools("my-mcp-client", []string{"tool1", "tool2"}, []string{"tool3"})
// if err != nil {
// log.Fatalf("Failed to edit MCP client tools: %v", err)
// }
func (bifrost *Bifrost) EditMCPClientTools(name string, toolsToAdd []string, toolsToRemove []string) error {
if bifrost.mcpManager == nil {
return fmt.Errorf("MCP is not configured in this Bifrost instance")
}

return bifrost.mcpManager.EditClientTools(name, toolsToAdd, toolsToRemove)
}

// Cleanup gracefully stops all workers when triggered.
// It closes all request channels and waits for workers to exit.
func (bifrost *Bifrost) Cleanup() {
Expand Down
217 changes: 182 additions & 35 deletions core/mcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"os"
"slices"
"strings"
Expand Down Expand Up @@ -99,29 +100,191 @@ func newMCPManager(config schemas.MCPConfig, logger schemas.Logger) (*MCPManager

// Process client configs: create client map entries and establish connections
for _, clientConfig := range config.ClientConfigs {
// Validate client configuration
if err := validateMCPClientConfig(&clientConfig); err != nil {
return nil, fmt.Errorf("invalid MCP client configuration: %w", err)
if err := manager.AddClient(clientConfig); err != nil {
manager.logger.Warn(fmt.Sprintf("%s Failed to add MCP client %s: %v", MCPLogPrefix, clientConfig.Name, err))
}
}

// Create client map entry
manager.clientMap[clientConfig.Name] = &MCPClient{
Name: clientConfig.Name,
ExecutionConfig: clientConfig,
ToolMap: make(map[string]schemas.Tool),
}
manager.logger.Info(MCPLogPrefix + " MCP Manager initialized")

// Attempt to establish connection
err := manager.connectToMCPClient(clientConfig)
if err != nil {
logger.Warn(fmt.Sprintf("%s Failed to connect to MCP client %s: %v", MCPLogPrefix, clientConfig.Name, err))
// Continue with other connections even if one fails
return manager, nil
}

// GetClients returns all MCP clients managed by the manager.
//
// Returns:
// - []*MCPClient: List of all MCP clients
// - error: Any retrieval error
func (m *MCPManager) GetClients() ([]MCPClient, error) {
m.mu.RLock()
defer m.mu.RUnlock()

clients := make([]MCPClient, 0, len(m.clientMap))
for _, client := range m.clientMap {
clients = append(clients, *client)
}

return clients, nil
}

// ReconnectClient attempts to reconnect an MCP client if it is disconnected.
func (m *MCPManager) ReconnectClient(name string) error {
m.mu.Lock()
defer m.mu.Unlock()

client, ok := m.clientMap[name]
if !ok {
return fmt.Errorf("client %s not found", name)
}

if client.Conn != nil {
return fmt.Errorf("client %s is already connected", name)
}

m.mu.Unlock()

// connectToMCPClient handles locking internally
err := m.connectToMCPClient(client.ExecutionConfig)
if err != nil {
return fmt.Errorf("failed to connect to MCP client %s: %w", name, err)
}

return nil
}
Comment thread
Pratham-Mishra04 marked this conversation as resolved.

// AddClient adds a new MCP client to the manager.
// It validates the client configuration and establishes a connection.
//
// Parameters:
// - config: MCP client configuration
//
// Returns:
func (m *MCPManager) AddClient(config schemas.MCPClientConfig) error {
if err := validateMCPClientConfig(&config); err != nil {
return fmt.Errorf("invalid MCP client configuration: %w", err)
}

// Make a copy of the config to use after unlocking
configCopy := config

m.mu.Lock()

if _, ok := m.clientMap[config.Name]; ok {
m.mu.Unlock()
return fmt.Errorf("client %s already exists", config.Name)
}

// Create placeholder entry
m.clientMap[config.Name] = &MCPClient{
Name: config.Name,
ExecutionConfig: config,
ToolMap: make(map[string]schemas.Tool),
}

// Temporarily unlock for the connection attempt
// This is to avoid deadlocks when the connection attempt is made
m.mu.Unlock()

// Connect using the copied config
if err := m.connectToMCPClient(configCopy); err != nil {
// Re-lock to clean up the failed entry
m.mu.Lock()
delete(m.clientMap, config.Name)
m.mu.Unlock()
return fmt.Errorf("failed to connect to MCP client %s: %w", config.Name, err)
}

return nil
}

// RemoveClient removes an MCP client from the manager.
// It handles cleanup for all transport types (HTTP, STDIO, SSE).
//
// Parameters:
// - name: Name of the client to remove
func (m *MCPManager) RemoveClient(name string) error {
m.mu.Lock()
defer m.mu.Unlock()

return m.removeClientUnsafe(name)
}

func (m *MCPManager) removeClientUnsafe(name string) error {
client, ok := m.clientMap[name]
if !ok {
return fmt.Errorf("client %s not found", name)
}

m.logger.Info(fmt.Sprintf("%s Disconnecting MCP client: %s", MCPLogPrefix, name))

// Cancel SSE context if present (required for proper SSE cleanup)
if client.cancelFunc != nil {
client.cancelFunc()
client.cancelFunc = nil
}

// Close the client transport connection
// This handles cleanup for all transport types (HTTP, STDIO, SSE)
if client.Conn != nil {
if err := client.Conn.Close(); err != nil {
m.logger.Error(fmt.Errorf("%s Failed to close MCP client %s: %w", MCPLogPrefix, name, err))
}
client.Conn = nil
}

manager.logger.Info(MCPLogPrefix + " MCP Manager initialized")
// Clear client tool map
client.ToolMap = make(map[string]schemas.Tool)

return manager, nil
delete(m.clientMap, name)
return nil
}

func (m *MCPManager) EditClientTools(name string, toolsToAdd []string, toolsToRemove []string) error {
m.mu.Lock()
defer m.mu.Unlock()

client, ok := m.clientMap[name]
if !ok {
return fmt.Errorf("client %s not found", name)
}

if client.Conn == nil {
return fmt.Errorf("client %s has no active connection", name)
}

// Update the client's execution config with new tool filters
config := client.ExecutionConfig
config.ToolsToExecute = toolsToAdd
config.ToolsToSkip = toolsToRemove

// Store the updated config
client.ExecutionConfig = config

// Clear current tool map
client.ToolMap = make(map[string]schemas.Tool)

// Temporarily unlock for the network call
m.mu.Unlock()

// Retrieve tools with updated configuration
tools, err := m.retrieveExternalTools(context.Background(), client.Conn, config)

// Re-lock to update the tool map
m.mu.Lock()

// Verify client still exists
if _, ok := m.clientMap[name]; !ok {
return fmt.Errorf("client %s was removed during tool update", name)
}

if err != nil {
return fmt.Errorf("failed to retrieve external tools: %w", err)
}

// Store discovered tools
maps.Copy(client.ToolMap, tools)

return nil
}

// ============================================================================
Expand Down Expand Up @@ -934,26 +1097,10 @@ func (m *MCPManager) cleanup() error {
defer m.mu.Unlock()

// Disconnect all external MCP clients
for name, client := range m.clientMap {
m.logger.Info(fmt.Sprintf("%s Disconnecting MCP client: %s", MCPLogPrefix, name))

// Cancel SSE context if present (required for proper SSE cleanup)
if client.cancelFunc != nil {
client.cancelFunc()
client.cancelFunc = nil
}

// Close the client transport connection
// This handles cleanup for all transport types (HTTP, STDIO, SSE)
if client.Conn != nil {
if err := client.Conn.Close(); err != nil {
m.logger.Error(fmt.Errorf("%s Failed to close MCP client %s: %w", MCPLogPrefix, name, err))
}
client.Conn = nil
for name := range m.clientMap {
if err := m.removeClientUnsafe(name); err != nil {
m.logger.Error(fmt.Errorf("%s Failed to remove MCP client %s: %w", MCPLogPrefix, name, err))
}

// Clear client tool map
client.ToolMap = make(map[string]schemas.Tool)
}

// Clear the client map
Expand Down
Loading