Skip to content

Commit 2d479bb

Browse files
authored
Merge pull request #477 from sunerpy/main
2 parents 56f2501 + bee9f90 commit 2d479bb

File tree

1 file changed

+4
-8
lines changed

1 file changed

+4
-8
lines changed

client/transport/streamable_http.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ type StreamableHTTP struct {
114114

115115
// OAuth support
116116
oauthHandler *OAuthHandler
117+
wg sync.WaitGroup
117118
}
118119

119120
// NewStreamableHTTP creates a new Streamable HTTP transport with the given server URL.
@@ -182,9 +183,10 @@ func (c *StreamableHTTP) Close() error {
182183
sessionId := c.sessionID.Load().(string)
183184
if sessionId != "" {
184185
c.sessionID.Store("")
185-
186+
c.wg.Add(1)
186187
// notify server session closed
187188
go func() {
189+
defer c.wg.Done()
188190
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
189191
defer cancel()
190192
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, c.serverURL.String(), nil)
@@ -201,7 +203,7 @@ func (c *StreamableHTTP) Close() error {
201203
res.Body.Close()
202204
}()
203205
}
204-
206+
c.wg.Wait()
205207
return nil
206208
}
207209

@@ -231,7 +233,6 @@ func (c *StreamableHTTP) SendRequest(
231233
ctx context.Context,
232234
request JSONRPCRequest,
233235
) (*JSONRPCResponse, error) {
234-
235236
// Marshal request
236237
requestBody, err := json.Marshal(request)
237238
if err != nil {
@@ -316,7 +317,6 @@ func (c *StreamableHTTP) sendHTTP(
316317
body io.Reader,
317318
acceptType string,
318319
) (resp *http.Response, err error) {
319-
320320
// Create HTTP request
321321
req, err := http.NewRequestWithContext(ctx, method, c.serverURL.String(), body)
322322
if err != nil {
@@ -374,7 +374,6 @@ func (c *StreamableHTTP) sendHTTP(
374374
// It returns the final result for the request once received, or an error.
375375
// If ignoreResponse is true, it won't return when a response messge is received. This is for continuous listening.
376376
func (c *StreamableHTTP) handleSSEResponse(ctx context.Context, reader io.ReadCloser, ignoreResponse bool) (*JSONRPCResponse, error) {
377-
378377
// Create a channel for this specific request
379378
responseChan := make(chan *JSONRPCResponse, 1)
380379

@@ -387,7 +386,6 @@ func (c *StreamableHTTP) handleSSEResponse(ctx context.Context, reader io.ReadCl
387386
defer close(responseChan)
388387

389388
c.readSSE(ctx, reader, func(event, data string) {
390-
391389
// (unsupported: batching)
392390

393391
var message JSONRPCResponse
@@ -490,7 +488,6 @@ func (c *StreamableHTTP) readSSE(ctx context.Context, reader io.ReadCloser, hand
490488
}
491489

492490
func (c *StreamableHTTP) SendNotification(ctx context.Context, notification mcp.JSONRPCNotification) error {
493-
494491
// Marshal request
495492
requestBody, err := json.Marshal(notification)
496493
if err != nil {
@@ -577,7 +574,6 @@ var (
577574
)
578575

579576
func (c *StreamableHTTP) createGETConnectionToServer(ctx context.Context) error {
580-
581577
resp, err := c.sendHTTP(ctx, http.MethodGet, nil, "text/event-stream")
582578
if err != nil {
583579
return fmt.Errorf("failed to send request: %w", err)

0 commit comments

Comments
 (0)