-
Notifications
You must be signed in to change notification settings - Fork 576
feat: plugin schemas segregation for mcp plugins #1299
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -517,7 +517,9 @@ func (m *ToolsManager) executeCode(ctx context.Context, code string) ExecutionRe | |
| default: | ||
| } | ||
|
|
||
| result, err := m.callMCPTool(timeoutCtx, clientNameFinal, toolNameFinal, argsMap, appendLog) | ||
| // Pass the original ctx (BifrostContext) to callMCPTool, not timeoutCtx | ||
| // callMCPTool will handle timeout internally | ||
| result, err := m.callMCPTool(ctx, clientNameFinal, toolNameFinal, argsMap, appendLog) | ||
|
|
||
| // Check if context was cancelled during execution | ||
| select { | ||
|
|
@@ -726,9 +728,10 @@ func (m *ToolsManager) executeCode(ctx context.Context, code string) ExecutionRe | |
| // callMCPTool calls an MCP tool and returns the result. | ||
| // It locates the client by name, constructs the MCP tool call request, executes it | ||
| // with timeout handling, and parses the response as JSON or returns it as a string. | ||
| // This function now runs MCP plugin hooks (PreMCPHook/PostMCPHook) for nested tool calls. | ||
| // | ||
| // Parameters: | ||
| // - ctx: Context for tool execution (used for timeout) | ||
| // - ctx: Context for tool execution (used for timeout and plugin hooks) | ||
| // - clientName: Name of the MCP client/server to call | ||
| // - toolName: Name of the tool to execute | ||
| // - args: Tool arguments as a map | ||
|
|
@@ -767,6 +770,209 @@ func (m *ToolsManager) callMCPTool(ctx context.Context, clientName, toolName str | |
| // The MCP server expects the original tool name, not the prefixed version | ||
| originalToolName := stripClientPrefix(toolName, clientName) | ||
|
|
||
| // ==================== PLUGIN PIPELINE INTEGRATION ==================== | ||
| // Set up parent-child request ID tracking and run plugin hooks | ||
|
|
||
| // Get original executeCode request ID from context (will become parent) | ||
| var bifrostCtx *schemas.BifrostContext | ||
| var ok bool | ||
| if bifrostCtx, ok = ctx.(*schemas.BifrostContext); !ok { | ||
| // Fallback: if not a BifrostContext, execute directly without plugins | ||
| return m.callMCPToolDirect(ctx, client, originalToolName, clientName, toolName, args, appendLog) | ||
| } | ||
|
|
||
| originalRequestID, _ := bifrostCtx.Value(schemas.BifrostContextKeyRequestID).(string) | ||
|
|
||
| // Generate new request ID for this nested tool call | ||
| var newRequestID string | ||
| if m.fetchNewRequestIDFunc != nil { | ||
| newRequestID = m.fetchNewRequestIDFunc(bifrostCtx) | ||
| } else { | ||
| // Fallback: generate a simple UUID-like ID | ||
| newRequestID = fmt.Sprintf("exec_%d_%s", time.Now().UnixNano(), toolName) | ||
| } | ||
|
|
||
| // Create new CHILD context with parent-child relationship | ||
| // IMPORTANT: We must use NewBifrostContext() to create a proper child context with its own | ||
| // userValues map. Using WithValue() would modify the parent context in-place, which would | ||
| // cause the parent executeToolCode's request ID to be overwritten with the last nested tool's | ||
| // request ID, leading to the parent's response overwriting the last nested tool's log entry. | ||
| deadline, hasDeadline := bifrostCtx.Deadline() | ||
| if !hasDeadline { | ||
| deadline = schemas.NoDeadline | ||
| } | ||
| nestedCtx := schemas.NewBifrostContext(bifrostCtx, deadline) | ||
| nestedCtx.SetValue(schemas.BifrostContextKeyRequestID, newRequestID) | ||
| if originalRequestID != "" { | ||
| nestedCtx.SetValue(schemas.BifrostContextKeyParentMCPRequestID, originalRequestID) | ||
| } | ||
|
|
||
| // Marshal arguments to JSON for the tool call | ||
| argsJSON, err := sonic.Marshal(args) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to marshal tool arguments: %v", err) | ||
| } | ||
|
|
||
| // Build tool call for MCP request | ||
| toolCall := schemas.ChatAssistantMessageToolCall{ | ||
| ID: schemas.Ptr(newRequestID), | ||
| Function: schemas.ChatAssistantMessageToolCallFunction{ | ||
| Name: schemas.Ptr(toolName), | ||
| Arguments: string(argsJSON), | ||
| }, | ||
| } | ||
|
|
||
| // Create BifrostMCPRequest | ||
| mcpRequest := &schemas.BifrostMCPRequest{ | ||
| RequestType: schemas.MCPRequestTypeChatToolCall, | ||
| ChatAssistantMessageToolCall: &toolCall, | ||
| } | ||
|
|
||
| // Check if plugin pipeline is available | ||
| if m.pluginPipelineProvider == nil { | ||
| // Fallback: execute directly without plugins | ||
| return m.callMCPToolDirect(ctx, client, originalToolName, clientName, toolName, args, appendLog) | ||
| } | ||
|
|
||
| // Get plugin pipeline and run hooks | ||
| pipeline := m.pluginPipelineProvider() | ||
| if pipeline == nil { | ||
| // Fallback: execute directly if pipeline is nil | ||
| return m.callMCPToolDirect(ctx, client, originalToolName, clientName, toolName, args, appendLog) | ||
| } | ||
| defer m.releasePluginPipeline(pipeline) | ||
|
|
||
| // Run PreMCPHooks | ||
| preReq, shortCircuit, preCount := pipeline.RunMCPPreHooks(nestedCtx, mcpRequest) | ||
|
|
||
| // Handle short-circuit cases | ||
| if shortCircuit != nil { | ||
| if shortCircuit.Response != nil { | ||
| finalResp, _ := pipeline.RunMCPPostHooks(nestedCtx, shortCircuit.Response, nil, preCount) | ||
| if finalResp != nil && finalResp.ChatMessage != nil { | ||
| return extractResultFromChatMessage(finalResp.ChatMessage), nil | ||
| } | ||
| return nil, fmt.Errorf("plugin short-circuit returned invalid response") | ||
| } | ||
| if shortCircuit.Error != nil { | ||
| pipeline.RunMCPPostHooks(nestedCtx, nil, shortCircuit.Error, preCount) | ||
| if shortCircuit.Error.Error != nil { | ||
| return nil, fmt.Errorf("%s", shortCircuit.Error.Error.Message) | ||
| } | ||
| return nil, fmt.Errorf("plugin short-circuit error") | ||
| } | ||
| } | ||
|
|
||
| // If pre-hooks modified the request, extract updated tool name and args | ||
| if preReq != nil && preReq.ChatAssistantMessageToolCall != nil { | ||
| toolCall = *preReq.ChatAssistantMessageToolCall | ||
| if toolCall.Function.Arguments != "" { | ||
| // Re-parse arguments if they were modified | ||
| if err := sonic.Unmarshal([]byte(toolCall.Function.Arguments), &args); err != nil { | ||
| logger.Warn(fmt.Sprintf("%s Failed to parse modified tool arguments, using original: %v", CodeModeLogPrefix, err)) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // ==================== EXECUTE TOOL ==================== | ||
|
|
||
| // Capture start time for latency calculation | ||
| startTime := time.Now() | ||
|
|
||
| // Derive tool name from originalToolName (ignore pre-hook modifications to tool name) | ||
| // Pre-hooks should not modify which tool gets called, only arguments | ||
| toolNameToCall := originalToolName | ||
|
|
||
| // Call the tool via MCP client | ||
| callRequest := mcp.CallToolRequest{ | ||
| Request: mcp.Request{ | ||
| Method: string(mcp.MethodToolsCall), | ||
| }, | ||
| Params: mcp.CallToolParams{ | ||
| Name: toolNameToCall, | ||
| Arguments: args, | ||
| }, | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| // Create timeout context | ||
| toolExecutionTimeout := m.toolExecutionTimeout.Load().(time.Duration) | ||
| toolCtx, cancel := context.WithTimeout(nestedCtx, toolExecutionTimeout) | ||
| defer cancel() | ||
|
|
||
| toolResponse, callErr := client.Conn.CallTool(toolCtx, callRequest) | ||
|
|
||
| // Calculate latency | ||
| latency := time.Since(startTime).Milliseconds() | ||
|
|
||
| // ==================== PREPARE RESPONSE FOR POST-HOOKS ==================== | ||
|
|
||
| var mcpResp *schemas.BifrostMCPResponse | ||
| var bifrostErr *schemas.BifrostError | ||
|
|
||
| if callErr != nil { | ||
| logger.Debug(fmt.Sprintf("%s Tool call failed: %s.%s - %v", CodeModeLogPrefix, clientName, toolName, callErr)) | ||
| appendLog(fmt.Sprintf("[TOOL] %s.%s error: %v", clientName, toolName, callErr)) | ||
| bifrostErr = &schemas.BifrostError{ | ||
| IsBifrostError: false, | ||
| Error: &schemas.ErrorField{ | ||
| Message: fmt.Sprintf("tool call failed for %s.%s: %v", clientName, toolName, callErr), | ||
| }, | ||
| } | ||
| } else { | ||
| // Extract result | ||
| rawResult := extractTextFromMCPResponse(toolResponse, toolName) | ||
|
|
||
| // Check if this is an error result (from NewToolResultError) | ||
| // Error results start with "Error: " prefix | ||
| if after, ok := strings.CutPrefix(rawResult, "Error: "); ok { | ||
| errorMsg := after | ||
| logger.Debug(fmt.Sprintf("%s Tool returned error result: %s.%s - %s", CodeModeLogPrefix, clientName, toolName, errorMsg)) | ||
| appendLog(fmt.Sprintf("[TOOL] %s.%s error result: %s", clientName, toolName, errorMsg)) | ||
| bifrostErr = &schemas.BifrostError{ | ||
| IsBifrostError: false, | ||
| Error: &schemas.ErrorField{ | ||
| Message: errorMsg, | ||
| }, | ||
| } | ||
| } else { | ||
| // Success case - create response | ||
| mcpResp = &schemas.BifrostMCPResponse{ | ||
| ChatMessage: createToolResponseMessage(toolCall, rawResult), | ||
| ExtraFields: schemas.BifrostMCPResponseExtraFields{ | ||
| ToolName: *toolCall.Function.Name, | ||
| Latency: latency, | ||
| }, | ||
| } | ||
|
Comment on lines
+866
to
+945
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reset tool name after pre-hooks to keep metadata consistent. 🛠️ Suggested fix if preReq != nil && preReq.ChatAssistantMessageToolCall != nil {
toolCall = *preReq.ChatAssistantMessageToolCall
+ // Keep metadata aligned with the executed tool (ignore pre-hook name mutations)
+ toolCall.Function.Name = schemas.Ptr(toolName)
if toolCall.Function.Arguments != "" {
// Re-parse arguments if they were modified
if err := sonic.Unmarshal([]byte(toolCall.Function.Arguments), &args); err != nil {
logger.Warn(fmt.Sprintf("%s Failed to parse modified tool arguments, using original: %v", CodeModeLogPrefix, err))
}
}
}🤖 Prompt for AI Agents |
||
|
|
||
| // Log the result | ||
| resultStr := formatResultForLog(rawResult) | ||
| appendLog(fmt.Sprintf("[TOOL] %s.%s raw response: %s", clientName, toolName, resultStr)) | ||
| } | ||
| } | ||
|
|
||
| // ==================== RUN POST-HOOKS ==================== | ||
|
|
||
| finalResp, finalErr := pipeline.RunMCPPostHooks(nestedCtx, mcpResp, bifrostErr, preCount) | ||
|
|
||
| // Return result | ||
| if finalErr != nil { | ||
| if finalErr.Error != nil { | ||
| return nil, fmt.Errorf("%s", finalErr.Error.Message) | ||
| } | ||
| return nil, fmt.Errorf("tool execution failed") | ||
| } | ||
|
|
||
| if finalResp == nil || finalResp.ChatMessage == nil { | ||
| return nil, fmt.Errorf("plugin post-hooks returned invalid response") | ||
| } | ||
|
|
||
| // Extract and parse the final result from the chat message | ||
| return extractResultFromChatMessage(finalResp.ChatMessage), nil | ||
| } | ||
|
|
||
| // callMCPToolDirect executes an MCP tool call directly without plugin hooks. | ||
| // This is used as a fallback when the plugin pipeline is not available or context is not BifrostContext. | ||
| func (m *ToolsManager) callMCPToolDirect(ctx context.Context, client *schemas.MCPClientState, originalToolName, clientName, toolName string, args map[string]interface{}, appendLog func(string)) (interface{}, error) { | ||
| // Call the tool via MCP client | ||
| callRequest := mcp.CallToolRequest{ | ||
| Request: mcp.Request{ | ||
|
|
@@ -816,6 +1022,24 @@ func (m *ToolsManager) callMCPTool(ctx context.Context, clientName, toolName str | |
| return finalResult, nil | ||
| } | ||
|
|
||
| // extractResultFromChatMessage extracts the result from a chat message and parses it as JSON if possible. | ||
| func extractResultFromChatMessage(msg *schemas.ChatMessage) interface{} { | ||
| if msg == nil || msg.Content == nil || msg.Content.ContentStr == nil { | ||
| return nil | ||
| } | ||
|
|
||
| rawResult := *msg.Content.ContentStr | ||
|
|
||
| // Try to parse as JSON, otherwise use as string | ||
| var finalResult interface{} | ||
| if err := sonic.Unmarshal([]byte(rawResult), &finalResult); err != nil { | ||
| // Not JSON, use as string | ||
| return rawResult | ||
| } | ||
|
|
||
| return finalResult | ||
| } | ||
|
|
||
| // HELPER FUNCTIONS | ||
|
|
||
| // formatResultForLog formats a result value for logging purposes. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.