Skip to content

Commit 9ead95c

Browse files
committed
feat(mcp): implement tool aggregation proxy and fix SSE stability
- Add MCP server proxy that aggregates tools from remote servers - Implement transparent proxying of tool calls to remote servers - Copy tool schemas and parameters correctly from remote tools - Set SSE read timeout to 1200 seconds for long-running operations - Remove complex reconnection logic in favor of native SSE persistence The MCP server now acts as a proxy that: 1. Connects to configured remote MCP servers 2. Aggregates their available tools 3. Exposes these tools locally with proper parameter schemas 4. Proxies tool calls to the appropriate remote server Additionally, fixes SSE connection stability by properly configuring timeouts and trusting the protocol's built-in persistence, rather than implementing custom reconnection logic.
1 parent 603603a commit 9ead95c

File tree

6 files changed

+474
-45
lines changed

6 files changed

+474
-45
lines changed

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,22 @@ require (
77
github.com/mattn/go-sqlite3 v1.14.27
88
github.com/mark3labs/mcp-go v0.18.0
99
github.com/rs/zerolog v1.34.0
10+
github.com/stretchr/testify v1.9.0
1011
go.uber.org/fx v1.23.0
1112
)
1213

1314
require (
15+
github.com/davecgh/go-spew v1.1.1 // indirect
16+
github.com/google/uuid v1.6.0 // indirect
1417
github.com/mattn/go-colorable v0.1.14 // indirect
1518
github.com/mattn/go-isatty v0.0.20 // indirect
19+
github.com/pmezard/go-difflib v1.0.0 // indirect
20+
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
1621
go.uber.org/dig v1.18.1 // indirect
1722
go.uber.org/multierr v1.11.0 // indirect
1823
go.uber.org/zap v1.27.0 // indirect
1924
golang.org/x/sys v0.32.0 // indirect
2025
github.com/google/uuid v1.6.0 // indirect
2126
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
27+
gopkg.in/yaml.v3 v3.0.1 // indirect
2228
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,7 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4545
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4646
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
4747
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
48+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
49+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
4850
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
4951
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

internal/app/app.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,10 @@ var BackendModule = fx.Module("backend",
8686
// MCPClientModule provides the MCP client (distinct from server frontend).
8787
var MCPClientModule = fx.Module("mcp_client",
8888
fx.Provide(
89-
mcp.NewServer,
89+
mcp.NewMCPServer,
90+
mcp.NewSSEServer,
9091
),
91-
fx.Invoke(func(server *mcp.Server, lc fx.Lifecycle) error {
92-
return server.Start(lc)
93-
}),
92+
fx.Invoke(mcp.RegisterMCPServerHooks),
9493
)
9594

9695
// UpdaterModule provides the tool and server updater service.

internal/mcp/client/client.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22
package client
33

44
import (
5+
"time"
6+
57
"github.com/mark3labs/mcp-go/client"
68
)
79

810
// Re-export the SSEMCPClient from mark3labs/mcp-go/client
911
type SSEMCPClient = client.SSEMCPClient
1012

11-
// NewSSEMCPClient creates a new SSE MCP client
12-
var NewSSEMCPClient = client.NewSSEMCPClient
13+
// NewSSEMCPClient creates a new SSE MCP client with a 60 second timeout
14+
func NewSSEMCPClient(url string) (*SSEMCPClient, error) {
15+
return client.NewSSEMCPClient(url,
16+
client.WithSSEReadTimeout(1200*time.Second),
17+
)
18+
}

internal/mcp/client/client_test.go

Lines changed: 143 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ package client
22

33
import (
44
"context"
5+
"encoding/json"
6+
"regexp"
7+
"strings"
58
"sync"
69
"testing"
710
"time"
@@ -21,12 +24,14 @@ func setupMCPClient(t *testing.T, url string) *MCPConnection {
2124
// Create a context for this specific connection
2225
ctx, cancel := context.WithCancel(context.Background())
2326

27+
t.Logf("Creating new SSE client for %s", url)
2428
client, err := NewSSEMCPClient(url)
2529
if err != nil {
2630
cancel()
2731
t.Fatalf("Failed to create client for %s: %v", url, err)
2832
}
2933

34+
t.Logf("Starting SSE client...")
3035
if err := client.Start(ctx); err != nil {
3136
cancel()
3237
t.Fatalf("Failed to start client for %s: %v", url, err)
@@ -40,12 +45,13 @@ func setupMCPClient(t *testing.T, url string) *MCPConnection {
4045
Version: "1.0.0",
4146
}
4247

48+
t.Logf("Initializing SSE client...")
4349
result, err := client.Initialize(ctx, initRequest)
4450
if err != nil {
4551
cancel()
4652
t.Fatalf("Failed to initialize client for %s: %v", url, err)
4753
}
48-
t.Logf("Connected to server at %s: %s", url, result.ServerInfo.Name)
54+
t.Logf("Successfully connected to server at %s: %s", url, result.ServerInfo.Name)
4955

5056
return &MCPConnection{
5157
client: client,
@@ -55,11 +61,27 @@ func setupMCPClient(t *testing.T, url string) *MCPConnection {
5561
}
5662
}
5763

64+
func checkConnection(t *testing.T, conn *MCPConnection) error {
65+
t.Log("Checking connection status...")
66+
// Create a context with a short timeout for the check
67+
checkCtx, cancel := context.WithTimeout(conn.ctx, 5*time.Second)
68+
defer cancel()
69+
70+
// Try a simple ListTools request to verify connection
71+
toolsRequest := mcp.ListToolsRequest{}
72+
_, err := conn.client.ListTools(checkCtx, toolsRequest)
73+
if err != nil {
74+
t.Logf("Connection check failed: %v", err)
75+
return err
76+
}
77+
t.Log("Connection check successful")
78+
return nil
79+
}
80+
5881
func TestMultipleMCPServers(t *testing.T) {
5982
// Define our MCP servers
6083
servers := []string{
61-
"http://0.0.0.0:8000/sse",
62-
}
84+
"http://0.0.0.0:8000/sse"}
6385

6486
// Create connections to all servers
6587
connections := make([]*MCPConnection, 0, len(servers))
@@ -105,3 +127,121 @@ func TestMultipleMCPServers(t *testing.T) {
105127
// Wait for all operations to complete
106128
wg.Wait()
107129
}
130+
131+
func TestBrowserTools(t *testing.T) {
132+
// Setup connection to the MCP server
133+
conn := setupMCPClient(t, "http://0.0.0.0:8000/sse")
134+
defer func() {
135+
t.Log("Cleaning up connection...")
136+
conn.cancel() // Cancel the context first
137+
conn.client.Close() // Then close the client
138+
t.Log("Connection cleanup complete")
139+
}()
140+
141+
// Create a context with timeout for the first operation
142+
ctx1, cancel1 := context.WithTimeout(conn.ctx, 60*time.Second)
143+
defer cancel1()
144+
145+
// First tool call: browser_use
146+
t.Log("Making browser_use tool call...")
147+
browserUseRequest := mcp.CallToolRequest{
148+
Params: struct {
149+
Name string `json:"name"`
150+
Arguments map[string]interface{} `json:"arguments,omitempty"`
151+
Meta *struct {
152+
ProgressToken mcp.ProgressToken `json:"progressToken,omitempty"`
153+
} `json:"_meta,omitempty"`
154+
}{
155+
Name: "browser_use",
156+
Arguments: map[string]interface{}{
157+
"action": "top 5 headlines",
158+
"url": "https://news.google.com/home?hl=en-US&gl=US&ceid=US:en",
159+
},
160+
},
161+
}
162+
163+
browserUseResult, err := conn.client.CallTool(ctx1, browserUseRequest)
164+
if err != nil {
165+
t.Fatalf("Failed to call browser_use: %v", err)
166+
}
167+
168+
// Log the raw result for debugging
169+
t.Logf("Raw browser_use result: %+v", browserUseResult)
170+
171+
// Extract task ID from the result
172+
var taskID string
173+
if len(browserUseResult.Content) > 0 {
174+
content := browserUseResult.Content[0]
175+
// Convert the content to string for regex matching
176+
contentBytes, err := json.Marshal(content)
177+
if err != nil {
178+
t.Fatalf("Failed to marshal content: %v", err)
179+
}
180+
contentStr := string(contentBytes)
181+
182+
// First extract the text field content
183+
textRegex := regexp.MustCompile(`"text":\s*"({[^}]+})"`)
184+
textMatches := textRegex.FindStringSubmatch(contentStr)
185+
if len(textMatches) < 2 {
186+
t.Fatalf("Could not find text content in response: %s", contentStr)
187+
}
188+
189+
// The text field contains escaped JSON, so we need to unescape it
190+
textContent := textMatches[1]
191+
textContent = strings.ReplaceAll(textContent, `\n`, "\n")
192+
textContent = strings.ReplaceAll(textContent, `\"`, `"`)
193+
194+
// Now extract the task_id from the unescaped JSON
195+
taskIDRegex := regexp.MustCompile(`"task_id":\s*"([^"]+)"`)
196+
matches := taskIDRegex.FindStringSubmatch(textContent)
197+
if len(matches) < 2 {
198+
t.Fatalf("Could not find task_id in text content: %s", textContent)
199+
}
200+
taskID = matches[1]
201+
t.Logf("Got task ID: %s", taskID)
202+
} else {
203+
t.Fatalf("No content in result")
204+
}
205+
206+
// Wait 40 seconds before checking result
207+
t.Log("Waiting 40 seconds before checking result...")
208+
time.Sleep(40 * time.Second)
209+
210+
// Create a fresh context for the second operation
211+
ctx2, cancel2 := context.WithTimeout(conn.ctx, 60*time.Second)
212+
defer cancel2()
213+
214+
// Second tool call: browser_get_result
215+
t.Log("Making browser_get_result tool call...")
216+
browserGetResultRequest := mcp.CallToolRequest{
217+
Params: struct {
218+
Name string `json:"name"`
219+
Arguments map[string]interface{} `json:"arguments,omitempty"`
220+
Meta *struct {
221+
ProgressToken mcp.ProgressToken `json:"progressToken,omitempty"`
222+
} `json:"_meta,omitempty"`
223+
}{
224+
Name: "browser_get_result",
225+
Arguments: map[string]interface{}{
226+
"task_id": taskID,
227+
},
228+
},
229+
}
230+
231+
browserGetResultResult, err := conn.client.CallTool(ctx2, browserGetResultRequest)
232+
if err != nil {
233+
t.Fatalf("Failed to call browser_get_result: %v", err)
234+
}
235+
236+
// Log the final result
237+
if len(browserGetResultResult.Content) > 0 {
238+
content := browserGetResultResult.Content[0]
239+
contentBytes, err := json.Marshal(content)
240+
if err != nil {
241+
t.Fatalf("Failed to marshal content: %v", err)
242+
}
243+
t.Logf("Final result: %s", string(contentBytes))
244+
} else {
245+
t.Fatalf("No content in final result")
246+
}
247+
}

0 commit comments

Comments
 (0)