diff --git a/client/sse.go b/client/sse.go index ae2ebcaf0..07512a9be 100644 --- a/client/sse.go +++ b/client/sse.go @@ -23,12 +23,10 @@ func WithHTTPClient(httpClient *http.Client) transport.ClientOption { // NewSSEMCPClient creates a new SSE-based MCP client with the given base URL. // Returns an error if the URL is invalid. func NewSSEMCPClient(baseURL string, options ...transport.ClientOption) (*Client, error) { - sseTransport, err := transport.NewSSE(baseURL, options...) if err != nil { return nil, fmt.Errorf("failed to create SSE transport: %w", err) } - return NewClient(sseTransport), nil } diff --git a/client/transport/sse.go b/client/transport/sse.go index 92f1de416..70a391905 100644 --- a/client/transport/sse.go +++ b/client/transport/sse.go @@ -16,6 +16,7 @@ import ( "time" "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/util" ) // SSE implements the transport layer of the MCP protocol using Server-Sent Events (SSE). @@ -33,6 +34,7 @@ type SSE struct { endpointChan chan struct{} headers map[string]string headerFunc HTTPHeaderFunc + logger util.Logger started atomic.Bool closed atomic.Bool @@ -47,6 +49,13 @@ type SSE struct { type ClientOption func(*SSE) +// WithSSELogger sets a custom logger for the SSE client. +func WithSSELogger(logger util.Logger) ClientOption { + return func(sc *SSE) { + sc.logger = logger + } +} + func WithHeaders(headers map[string]string) ClientOption { return func(sc *SSE) { sc.headers = headers @@ -85,6 +94,7 @@ func NewSSE(baseURL string, options ...ClientOption) (*SSE, error) { responses: make(map[string]chan *JSONRPCResponse), endpointChan: make(chan struct{}), headers: make(map[string]string), + logger: util.DefaultLogger(), } for _, opt := range options { @@ -104,7 +114,6 @@ func NewSSE(baseURL string, options ...ClientOption) (*SSE, error) { // Start initiates the SSE connection to the server and waits for the endpoint information. // Returns an error if the connection fails or times out waiting for the endpoint. func (c *SSE) Start(ctx context.Context) error { - if c.started.Load() { return fmt.Errorf("has already started") } @@ -113,7 +122,6 @@ func (c *SSE) Start(ctx context.Context) error { c.cancelSSEStream = cancel req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL.String(), nil) - if err != nil { return fmt.Errorf("failed to create request: %w", err) } @@ -220,7 +228,7 @@ func (c *SSE) readSSE(reader io.ReadCloser) { } } if !c.closed.Load() { - fmt.Printf("SSE stream error: %v\n", err) + c.logger.Errorf("SSE stream error: %v", err) } return } @@ -256,11 +264,11 @@ func (c *SSE) handleSSEEvent(event, data string) { case "endpoint": endpoint, err := c.baseURL.Parse(data) if err != nil { - fmt.Printf("Error parsing endpoint URL: %v\n", err) + c.logger.Errorf("Error parsing endpoint URL: %v", err) return } if endpoint.Host != c.baseURL.Host { - fmt.Printf("Endpoint origin does not match connection origin\n") + c.logger.Errorf("Endpoint origin does not match connection origin") return } c.endpoint = endpoint @@ -269,7 +277,7 @@ func (c *SSE) handleSSEEvent(event, data string) { case "message": var baseMessage JSONRPCResponse if err := json.Unmarshal([]byte(data), &baseMessage); err != nil { - fmt.Printf("Error unmarshaling message: %v\n", err) + c.logger.Errorf("Error unmarshaling message: %v", err) return } @@ -321,7 +329,6 @@ func (c *SSE) SendRequest( ctx context.Context, request JSONRPCRequest, ) (*JSONRPCResponse, error) { - if !c.started.Load() { return nil, fmt.Errorf("transport not started yet") } diff --git a/client/transport/sse_test.go b/client/transport/sse_test.go index ca05180c4..31c70887f 100644 --- a/client/transport/sse_test.go +++ b/client/transport/sse_test.go @@ -4,17 +4,17 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" + "net/http" + "net/http/httptest" "strings" "sync" "testing" "time" - "fmt" - "net/http" - "net/http/httptest" - "github.com/mark3labs/mcp-go/mcp" + "github.com/stretchr/testify/require" ) // mockReaderWithError is a mock io.ReadCloser that simulates reading some data @@ -30,18 +30,18 @@ func (m *mockReaderWithError) Read(p []byte) (n int, err error) { if m.closed { return 0, io.EOF } - + if m.position >= len(m.data) { return 0, m.err } - + n = copy(p, m.data[m.position:]) m.position += n - + if m.position >= len(m.data) { return n, m.err } - + return n, nil } @@ -150,7 +150,6 @@ func startMockSSEEchoServer() (string, func()) { flush() } }() - }) // Create a router to handle different endpoints @@ -263,7 +262,6 @@ func TestSSE(t *testing.T) { }) t.Run("SendNotification & NotificationHandler", func(t *testing.T) { - var wg sync.WaitGroup notificationChan := make(chan mcp.JSONRPCNotification, 1) @@ -403,7 +401,6 @@ func TestSSE(t *testing.T) { }) t.Run("ResponseError", func(t *testing.T) { - // Prepare a request request := JSONRPCRequest{ JSONRPC: "2.0", @@ -546,34 +543,34 @@ func TestSSE(t *testing.T) { t.Run("NO_ERROR_WithoutConnectionLostHandler", func(t *testing.T) { // Test that NO_ERROR without connection lost handler maintains backward compatibility // When no connection lost handler is set, NO_ERROR should be treated as a regular error - + // Create a mock Reader that simulates NO_ERROR mockReader := &mockReaderWithError{ data: []byte("event: endpoint\ndata: /message\n\n"), err: errors.New("connection closed: NO_ERROR"), } - + // Create SSE transport url, closeF := startMockSSEEchoServer() defer closeF() - + trans, err := NewSSE(url) if err != nil { t.Fatal(err) } - + // DO NOT set connection lost handler to test backward compatibility - + // Capture stderr to verify the error is printed (backward compatible behavior) // Since we can't easily capture fmt.Printf output in tests, we'll just verify // that the readSSE method returns without calling any handler - + // Directly test the readSSE method with our mock reader go trans.readSSE(mockReader) - + // Wait for readSSE to complete time.Sleep(100 * time.Millisecond) - + // The test passes if readSSE completes without panicking or hanging // In backward compatibility mode, NO_ERROR should be treated as a regular error t.Log("Backward compatibility test passed: NO_ERROR handled as regular error when no handler is set") @@ -583,26 +580,26 @@ func TestSSE(t *testing.T) { // Test that NO_ERROR in HTTP/2 connection loss is properly handled // This test verifies that when a connection is lost in a way that produces // an error message containing "NO_ERROR", the connection lost handler is called - + var connectionLostCalled bool var connectionLostError error var mu sync.Mutex - + // Create a mock Reader that simulates connection loss with NO_ERROR mockReader := &mockReaderWithError{ data: []byte("event: endpoint\ndata: /message\n\n"), err: errors.New("http2: stream closed with error code NO_ERROR"), } - + // Create SSE transport url, closeF := startMockSSEEchoServer() defer closeF() - + trans, err := NewSSE(url) if err != nil { t.Fatal(err) } - + // Set connection lost handler trans.SetConnectionLostHandler(func(err error) { mu.Lock() @@ -610,15 +607,15 @@ func TestSSE(t *testing.T) { connectionLostCalled = true connectionLostError = err }) - + // Directly test the readSSE method with our mock reader that simulates NO_ERROR go trans.readSSE(mockReader) - + // Wait for connection lost handler to be called timeout := time.After(1 * time.Second) ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() - + for { select { case <-timeout: @@ -628,17 +625,17 @@ func TestSSE(t *testing.T) { called := connectionLostCalled err := connectionLostError mu.Unlock() - + if called { if err == nil { t.Fatal("Expected connection lost error, got nil") } - + // Verify that the error contains "NO_ERROR" string if !strings.Contains(err.Error(), "NO_ERROR") { t.Errorf("Expected error to contain 'NO_ERROR', got: %v", err) } - + t.Logf("Connection lost handler called with NO_ERROR: %v", err) return } @@ -649,26 +646,26 @@ func TestSSE(t *testing.T) { t.Run("NO_ERROR_Handling", func(t *testing.T) { // Test specific NO_ERROR string handling in readSSE method // This tests the code path at line 209 where NO_ERROR is checked - + // Create a mock Reader that simulates an error containing "NO_ERROR" mockReader := &mockReaderWithError{ data: []byte("event: endpoint\ndata: /message\n\n"), err: errors.New("connection closed: NO_ERROR"), } - + // Create SSE transport url, closeF := startMockSSEEchoServer() defer closeF() - + trans, err := NewSSE(url) if err != nil { t.Fatal(err) } - + var connectionLostCalled bool var connectionLostError error var mu sync.Mutex - + // Set connection lost handler to verify it's called for NO_ERROR trans.SetConnectionLostHandler(func(err error) { mu.Lock() @@ -676,15 +673,15 @@ func TestSSE(t *testing.T) { connectionLostCalled = true connectionLostError = err }) - + // Directly test the readSSE method with our mock reader go trans.readSSE(mockReader) - + // Wait for connection lost handler to be called timeout := time.After(1 * time.Second) ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() - + for { select { case <-timeout: @@ -694,17 +691,17 @@ func TestSSE(t *testing.T) { called := connectionLostCalled err := connectionLostError mu.Unlock() - + if called { if err == nil { t.Fatal("Expected connection lost error with NO_ERROR, got nil") } - + // Verify that the error contains "NO_ERROR" string if !strings.Contains(err.Error(), "NO_ERROR") { t.Errorf("Expected error to contain 'NO_ERROR', got: %v", err) } - + t.Logf("Successfully handled NO_ERROR: %v", err) return } @@ -714,47 +711,46 @@ func TestSSE(t *testing.T) { t.Run("RegularError_DoesNotTriggerConnectionLost", func(t *testing.T) { // Test that regular errors (not containing NO_ERROR) do not trigger connection lost handler - + // Create a mock Reader that simulates a regular error mockReader := &mockReaderWithError{ data: []byte("event: endpoint\ndata: /message\n\n"), err: errors.New("regular connection error"), } - + // Create SSE transport url, closeF := startMockSSEEchoServer() defer closeF() - + trans, err := NewSSE(url) if err != nil { t.Fatal(err) } - + var connectionLostCalled bool var mu sync.Mutex - + // Set connection lost handler - this should NOT be called for regular errors trans.SetConnectionLostHandler(func(err error) { mu.Lock() defer mu.Unlock() connectionLostCalled = true }) - + // Directly test the readSSE method with our mock reader go trans.readSSE(mockReader) - + // Wait and verify connection lost handler is NOT called time.Sleep(200 * time.Millisecond) - + mu.Lock() called := connectionLostCalled mu.Unlock() - + if called { t.Error("Connection lost handler should not be called for regular errors") } }) - } func TestSSEErrors(t *testing.T) { @@ -871,4 +867,49 @@ func TestSSEErrors(t *testing.T) { } }) + t.Run("SSEStreamErrorLogging", func(t *testing.T) { + logChan := make(chan string, 10) + testLogger := &testLogger{logChan: logChan} + + sseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported", http.StatusInternalServerError) + return + } + + fmt.Fprintf(w, "event: endpoint\ndata: %s\n\n", "/message") + flusher.Flush() + + fmt.Fprintf(w, "event: message\ndata: {invalid json}\n\n") + flusher.Flush() + + time.Sleep(50 * time.Millisecond) + }) + + testServer := httptest.NewServer(sseHandler) + t.Cleanup(testServer.Close) + + trans, err := NewSSE(testServer.URL, WithSSELogger(testLogger)) + require.NoError(t, err) + + // Start the transport + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + t.Cleanup(cancel) + + err = trans.Start(ctx) + require.NoError(t, err) + t.Cleanup(func() { _ = trans.Close() }) + + // Wait for the error log message about unmarshaling + select { + case logMsg := <-logChan: + if !strings.Contains(logMsg, "Error unmarshaling message") { + t.Errorf("Expected error log about unmarshaling message, got: %s", logMsg) + } + case <-time.After(3 * time.Second): + t.Fatal("Timeout waiting for error log message") + } + }) } diff --git a/client/transport/stdio.go b/client/transport/stdio.go index 70418a215..488164c79 100644 --- a/client/transport/stdio.go +++ b/client/transport/stdio.go @@ -12,6 +12,7 @@ import ( "sync" "github.com/mark3labs/mcp-go/mcp" + "github.com/mark3labs/mcp-go/util" ) // Stdio implements the transport layer of the MCP protocol using stdio communication. @@ -37,6 +38,7 @@ type Stdio struct { requestMu sync.RWMutex ctx context.Context ctxMu sync.RWMutex + logger util.Logger } // StdioOption defines a function that configures a Stdio transport instance. @@ -57,6 +59,13 @@ func WithCommandFunc(f CommandFunc) StdioOption { } } +// WithCommandLogger sets a custom logger for the stdio transport. +func WithCommandLogger(logger util.Logger) StdioOption { + return func(s *Stdio) { + s.logger = logger + } +} + // NewIO returns a new stdio-based transport using existing input, output, and // logging streams instead of spawning a subprocess. // This is useful for testing and simulating client behavior. @@ -69,6 +78,7 @@ func NewIO(input io.Reader, output io.WriteCloser, logging io.ReadCloser) *Stdio responses: make(map[string]chan *JSONRPCResponse), done: make(chan struct{}), ctx: context.Background(), + logger: util.DefaultLogger(), } } @@ -102,6 +112,7 @@ func NewStdioWithOptions( responses: make(map[string]chan *JSONRPCResponse), done: make(chan struct{}), ctx: context.Background(), + logger: util.DefaultLogger(), } for _, opt := range opts { @@ -239,7 +250,7 @@ func (c *Stdio) readResponses() { line, err := c.stdout.ReadString('\n') if err != nil { if err != io.EOF && !errors.Is(err, context.Canceled) { - fmt.Printf("Error reading response: %v\n", err) + c.logger.Errorf("Error reading from stdout: %v", err) } return } @@ -429,7 +440,6 @@ func (c *Stdio) handleIncomingRequest(request JSONRPCRequest) { } response, err := handler(ctx, request) - if err != nil { errorResponse := JSONRPCResponse{ JSONRPC: mcp.JSONRPC_VERSION, @@ -457,13 +467,13 @@ func (c *Stdio) handleIncomingRequest(request JSONRPCRequest) { func (c *Stdio) sendResponse(response JSONRPCResponse) { responseBytes, err := json.Marshal(response) if err != nil { - fmt.Printf("Error marshaling response: %v\n", err) + c.logger.Errorf("Error marshaling response: %v", err) return } responseBytes = append(responseBytes, '\n') if _, err := c.stdin.Write(responseBytes); err != nil { - fmt.Printf("Error writing response: %v\n", err) + c.logger.Errorf("Error writing response: %v", err) } } diff --git a/client/transport/stdio_test.go b/client/transport/stdio_test.go index 3c6804f3b..18aa932e8 100644 --- a/client/transport/stdio_test.go +++ b/client/transport/stdio_test.go @@ -5,18 +5,19 @@ import ( "encoding/json" "errors" "fmt" + "io" "os" "os/exec" "path/filepath" "runtime" + "strings" "sync" "syscall" "testing" "time" - "github.com/stretchr/testify/require" - "github.com/mark3labs/mcp-go/mcp" + "github.com/stretchr/testify/require" ) func compileTestServer(outputPath string) error { @@ -508,6 +509,70 @@ func TestStdioErrors(t *testing.T) { t.Errorf("Expected error when sending request after close, got nil") } }) + + t.Run("StdioResponseWritingErrorLogging", func(t *testing.T) { + logChan := make(chan string, 10) + testLogger := &testLogger{logChan: logChan} + + _, stdinWriter := io.Pipe() + stdoutReader, stdoutWriter := io.Pipe() + stderrReader, stderrWriter := io.Pipe() + t.Cleanup(func() { + _ = stdinWriter.Close() + _ = stdoutWriter.Close() + _ = stderrWriter.Close() + }) + + stdio := NewIO(stdoutReader, stdinWriter, stderrReader) + stdio.logger = testLogger + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + t.Cleanup(cancel) + + err := stdio.Start(ctx) + if err != nil { + t.Fatalf("Failed to start stdio transport: %v", err) + } + t.Cleanup(func() { _ = stdio.Close() }) + + stdio.SetRequestHandler(func(ctx context.Context, request JSONRPCRequest) (*JSONRPCResponse, error) { + return &JSONRPCResponse{ + JSONRPC: "2.0", + ID: request.ID, + Result: json.RawMessage(`"test response"`), + }, nil + }) + + doneChan := make(chan struct{}) + go func() { + // Simulate a request coming from the server + request := JSONRPCRequest{ + JSONRPC: "2.0", + ID: mcp.NewRequestId(int64(1)), + Method: "test/method", + } + requestBytes, _ := json.Marshal(request) + requestBytes = append(requestBytes, '\n') + _, _ = stdoutWriter.Write(requestBytes) + + // Close stdin to trigger a write error when the response is sent + time.Sleep(50 * time.Millisecond) // Give time for the request to be processed + _ = stdinWriter.Close() + doneChan <- struct{}{} + }() + + <-doneChan + + // Wait for the error log message + select { + case logMsg := <-logChan: + if !strings.Contains(logMsg, "Error writing response") { + t.Errorf("Expected error log about writing response, got: %s", logMsg) + } + case <-time.After(3 * time.Second): + t.Fatal("Timeout waiting for error log message") + } + }) } func TestStdio_WithCommandFunc(t *testing.T) { diff --git a/client/transport/streamable_http.go b/client/transport/streamable_http.go index f8965553a..268aeb342 100644 --- a/client/transport/streamable_http.go +++ b/client/transport/streamable_http.go @@ -68,12 +68,18 @@ func WithHTTPOAuth(config OAuthConfig) StreamableHTTPCOption { } } -func WithLogger(logger util.Logger) StreamableHTTPCOption { +// WithHTTPLogger sets a custom logger for the StreamableHTTP transport. +func WithHTTPLogger(logger util.Logger) StreamableHTTPCOption { return func(sc *StreamableHTTP) { sc.logger = logger } } +// Deprecated: Use [WithHTTPLogger] instead. +func WithLogger(logger util.Logger) StreamableHTTPCOption { + return WithHTTPLogger(logger) +} + // WithSession creates a client with a pre-configured session func WithSession(sessionID string) StreamableHTTPCOption { return func(sc *StreamableHTTP) { diff --git a/client/transport/streamable_http_test.go b/client/transport/streamable_http_test.go index 4831d5ecc..5208cb9c3 100644 --- a/client/transport/streamable_http_test.go +++ b/client/transport/streamable_http_test.go @@ -523,7 +523,6 @@ func TestStreamableHTTPErrors(t *testing.T) { t.Errorf("Expected error when sending request to non-existent URL, got nil") } }) - } // ---- continuous listening tests ---- @@ -718,7 +717,6 @@ func TestContinuousListening(t *testing.T) { } func TestContinuousListeningMethodNotAllowed(t *testing.T) { - // Start a server that doesn't support GET url, closeServer, _, _ := startMockStreamableWithGETSupport(false) diff --git a/www/docs/pages/clients/transports.mdx b/www/docs/pages/clients/transports.mdx index af25fb65a..1a2e6ddcf 100644 --- a/www/docs/pages/clients/transports.mdx +++ b/www/docs/pages/clients/transports.mdx @@ -6,12 +6,12 @@ Learn about transport-specific client implementations and how to choose the righ MCP-Go provides client implementations for all supported transports. Each transport has different characteristics and is optimized for specific scenarios. -| Transport | Best For | Connection | Real-time | Multi-client | -|-----------|----------|------------|-----------|--------------| -| **STDIO** | CLI tools, desktop apps | Process pipes | No | No | -| **StreamableHTTP** | Web services, APIs | HTTP requests | No | Yes | -| **SSE** | Web apps, real-time | HTTP + EventSource | Yes | Yes | -| **In-Process** | Testing, embedded | Direct calls | Yes | No | +| Transport | Best For | Connection | Real-time | Multi-client | +| ------------------ | ----------------------- | ------------------ | --------- | ------------ | +| **STDIO** | CLI tools, desktop apps | Process pipes | No | No | +| **StreamableHTTP** | Web services, APIs | HTTP requests | No | Yes | +| **SSE** | Web apps, real-time | HTTP + EventSource | Yes | Yes | +| **In-Process** | Testing, embedded | Direct calls | Yes | No | ## STDIO Client @@ -65,6 +65,42 @@ func createStdioClient() { } ``` +### STDIO Client with Custom Configuration + +```go +func createCustomStdioClient() { + // Create custom logger for debugging + logger := myCustomLogger{} + + // Create STDIO client with custom options + c, err := client.NewStdioMCPClientWithOptions( + "go", + []string{"GOCACHE=/tmp/gocache"}, // Custom environment + []string{"run", "/path/to/server/main.go"}, + transport.WithCommandLogger(logger), + transport.WithCommandFunc(func(ctx context.Context, command string, args []string, env []string) (*exec.Cmd, error) { + cmd := exec.CommandContext(ctx, command, args...) + cmd.Env = append(os.Environ(), env...) + cmd.Dir = "/path/to/working/directory" + return cmd, nil + }), + ) + if err != nil { + log.Fatal(err) + } + defer c.Close() + + ctx := context.Background() + + // Initialize connection + if err := c.Initialize(ctx); err != nil { + log.Fatal(err) + } + + // Use the client... +} +``` + ### STDIO Error Handling ```go @@ -175,7 +211,7 @@ func (msc *ManagedStdioClient) monitorProcess() { return case <-msc.restartChan: log.Println("Restarting STDIO client...") - + if msc.client != nil { msc.client.Close() } @@ -219,11 +255,11 @@ func (msc *ManagedStdioClient) CallTool(ctx context.Context, req mcp.CallToolReq func (msc *ManagedStdioClient) Close() error { msc.cancel() msc.wg.Wait() - + if msc.client != nil { return msc.client.Close() } - + return nil } @@ -277,8 +313,12 @@ func createStreamableHTTPClient() { ```go func createCustomStreamableHTTPClient() { + // Create custom logger for debugging + logger := myCustomLogger{} + // Create StreamableHTTP client with options c := client.NewStreamableHttpClient("https://api.example.com/mcp", + transport.WithLogger(logger), transport.WithHTTPTimeout(30*time.Second), transport.WithHTTPHeaders(map[string]string{ "User-Agent": "MyApp/1.0", @@ -390,12 +430,13 @@ func (pool *StreamableHTTPClientPool) CallTool(ctx context.Context, req mcp.Call ``` ### StreamableHTTP With Preconfigured Session + You can also create a StreamableHTTP client with a preconfigured session, which allows you to reuse the same session across multiple requests ```go func createStreamableHTTPClientWithSession() { // Create StreamableHTTP client with options - sessionID := // fetch existing session ID + sessionID := // fetch existing session ID c := client.NewStreamableHttpClient("https://api.example.com/mcp", transport.WithSession(sessionID), ) @@ -405,7 +446,7 @@ func createStreamableHTTPClientWithSession() { // Use client... _, err := c.ListTools(ctx) // If the session is terminated, you must reinitialize the client - if errors.Is(err, transport.ErrSessionTerminated) { + if errors.Is(err, transport.ErrSessionTerminated) { c.Initialize(ctx) // Reinitialize if session is terminated // The session ID should change after reinitialization sessionID = c.GetSessionId() // Update session ID @@ -458,6 +499,40 @@ func createSSEClient() { } ``` +### SSE Client with Custom Configuration + +```go +func createCustomSSEClient() { + // Create custom logger for debugging + logger := myCustomLogger{} + + // Create SSE client with custom options + c, err := client.NewSSEMCPClient("http://localhost:8080/mcp/sse", + transport.WithSSELogger(logger), + transport.WithHeaders(map[string]string{ + "Authorization": "Bearer your-token", + "User-Agent": "MyApp/1.0", + }), + transport.WithHTTPClient(&http.Client{ + Timeout: 30 * time.Second, + }), + ) + if err != nil { + log.Fatal(err) + } + defer c.Close() + + ctx := context.Background() + + // Initialize + if err := c.Initialize(ctx); err != nil { + log.Fatal(err) + } + + // Use client... +} +``` + ### SSE Client with Reconnection ```go @@ -501,7 +576,7 @@ func (rsc *ResilientSSEClient) connect() error { } client := client.NewSSEClient(rsc.baseURL) - + // Set headers for key, value := range rsc.headers { client.SetHeader(key, value) @@ -522,11 +597,11 @@ func (rsc *ResilientSSEClient) reconnectLoop() { return case <-rsc.reconnectCh: log.Println("Reconnecting SSE client...") - + for attempt := 1; attempt <= 5; attempt++ { if err := rsc.connect(); err != nil { log.Printf("Reconnection attempt %d failed: %v", attempt, err) - + backoff := time.Duration(attempt) * time.Second select { case <-time.After(backoff): @@ -578,14 +653,14 @@ func (rsc *ResilientSSEClient) Subscribe(ctx context.Context) (<-chan mcp.Notifi func (rsc *ResilientSSEClient) Close() error { rsc.cancel() - + rsc.mutex.Lock() defer rsc.mutex.Unlock() - + if rsc.client != nil { return rsc.client.Close() } - + return nil } @@ -628,7 +703,7 @@ func (seh *SSEEventHandler) Start() error { seh.wg.Add(1) go func() { defer seh.wg.Done() - + for { select { case notification := <-notifications: @@ -666,7 +741,7 @@ func (seh *SSEEventHandler) OnToolUpdate(handler func(mcp.Notification)) { func (seh *SSEEventHandler) addHandler(method string, handler func(mcp.Notification)) { seh.mutex.Lock() defer seh.mutex.Unlock() - + seh.handlers[method] = append(seh.handlers[method], handler) } @@ -691,7 +766,7 @@ In-process clients provide direct communication with servers in the same process func createInProcessClient() { // Create server s := server.NewMCPServer("Test Server", "1.0.0") - + // Add tools to server s.AddTool( mcp.NewTool("test_tool", @@ -829,16 +904,16 @@ func SelectTransport(req TransportRequirements) string { switch { case !req.NetworkRequired && req.Performance == "high": return "inprocess" - + case !req.NetworkRequired && !req.MultiClient: return "stdio" - + case req.RealTime && req.MultiClient: return "sse" - + case req.NetworkRequired && req.MultiClient: return "streamablehttp" - + default: return "stdio" // Default fallback } @@ -935,12 +1010,12 @@ func (cf *ClientFactory) CreateClient(transport string) (client.Client, error) { if !ok { return nil, fmt.Errorf("streamablehttp config not set") } - + options := []transport.StreamableHTTPCOption{} if len(config.Headers) > 0 { options = append(options, transport.WithHTTPHeaders(config.Headers)) } - + return client.NewStreamableHttpClient(config.BaseURL, options...), nil case "sse": @@ -951,12 +1026,12 @@ func (cf *ClientFactory) CreateClient(transport string) (client.Client, error) { if !ok { return nil, fmt.Errorf("sse config not set") } - + options := []transport.ClientOption{} if len(config.Headers) > 0 { options = append(options, transport.WithHeaders(config.Headers)) } - + return client.NewSSEMCPClient(config.BaseURL, options...) default: @@ -967,7 +1042,7 @@ func (cf *ClientFactory) CreateClient(transport string) (client.Client, error) { // Usage func demonstrateClientFactory() { factory := NewClientFactory() - + // Configure transports factory.SetStdioConfig("go", "run", "server.go") factory.SetStreamableHTTPConfig("http://localhost:8080/mcp", map[string]string{ @@ -993,3 +1068,19 @@ func demonstrateClientFactory() { } ``` +## Logging Configuration + +All client transports support custom logging. +Each transport provides a logger option that accepts any implementation of the `util.Logger` interface. + +```go +type myCustomLogger struct {} + +func (myCustomLogger) Infof(format string, args ...any) { + // TODO +} + +func (myCustomLogger) Errorf(format string, args ...any) { + // TODO +} +```