Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 19 additions & 49 deletions src/client/streamableHttp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,46 +124,6 @@ describe("StreamableHTTPClientTransport", () => {
expect(errorSpy).toHaveBeenCalled();
});

it("should handle session termination via DELETE request", async () => {
// First set the session ID by mocking initialization
(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers({ "mcp-session-id": "session-to-terminate" }),
});

await transport.send({
jsonrpc: "2.0",
method: "initialize",
params: {
clientInfo: { name: "test-client", version: "1.0" },
protocolVersion: "2025-03-26"
},
id: "init-id"
} as JSONRPCMessage);

// Mock DELETE request for session termination
(global.fetch as jest.Mock).mockResolvedValueOnce({
ok: true,
status: 200,
headers: new Headers()
});

const closeSpy = jest.fn();
transport.onclose = closeSpy;

await transport.close();

// Check that DELETE request was sent
const calls = (global.fetch as jest.Mock).mock.calls;
const lastCall = calls[calls.length - 1];
expect(lastCall[1].method).toBe("DELETE");
// The headers may be a plain object in tests
expect(lastCall[1].headers["mcp-session-id"]).toBe("session-to-terminate");

expect(closeSpy).toHaveBeenCalled();
});

it("should handle non-streaming JSON response", async () => {
const message: JSONRPCMessage = {
jsonrpc: "2.0",
Expand Down Expand Up @@ -201,7 +161,10 @@ describe("StreamableHTTPClientTransport", () => {
statusText: "Method Not Allowed"
});

// We expect the 405 error to be caught and handled gracefully
// This should not throw an error that breaks the transport
await transport.start();
await expect(transport.openSseStream()).rejects.toThrow('Failed to open SSE stream: Method Not Allowed');

// Check that GET was attempted
expect(global.fetch).toHaveBeenCalledWith(
Expand Down Expand Up @@ -246,6 +209,7 @@ describe("StreamableHTTPClientTransport", () => {
transport.onmessage = messageSpy;

await transport.start();
await transport.openSseStream();

// Give time for the SSE event to be processed
await new Promise(resolve => setTimeout(resolve, 50));
Expand Down Expand Up @@ -273,7 +237,7 @@ describe("StreamableHTTPClientTransport", () => {

(global.fetch as jest.Mock)
.mockResolvedValueOnce({
ok: true,
ok: true,
status: 200,
headers: new Headers({ "content-type": "text/event-stream" }),
body: makeStream("request1")
Expand All @@ -295,16 +259,21 @@ describe("StreamableHTTPClientTransport", () => {
]);

// Give time for SSE processing
await new Promise(resolve => setTimeout(resolve, 50));
await new Promise(resolve => setTimeout(resolve, 100));

// Both streams should have delivered their messages
expect(messageSpy).toHaveBeenCalledTimes(2);
expect(messageSpy).toHaveBeenCalledWith(
expect.objectContaining({ result: { id: "request1" }, id: "request1" })
);
expect(messageSpy).toHaveBeenCalledWith(
expect.objectContaining({ result: { id: "request2" }, id: "request2" })
);

// Verify received messages without assuming specific order
expect(messageSpy.mock.calls.some(call => {
const msg = call[0];
return msg.id === "request1" && msg.result?.id === "request1";
})).toBe(true);

expect(messageSpy.mock.calls.some(call => {
const msg = call[0];
return msg.id === "request2" && msg.result?.id === "request2";
})).toBe(true);
});

it("should include last-event-id header when resuming a broken connection", async () => {
Expand All @@ -326,6 +295,7 @@ describe("StreamableHTTPClientTransport", () => {
});

await transport.start();
await transport.openSseStream();
await new Promise(resolve => setTimeout(resolve, 50));

// Now simulate attempting to reconnect
Expand All @@ -336,7 +306,7 @@ describe("StreamableHTTPClientTransport", () => {
body: null
});

await transport.start();
await transport.openSseStream();

// Check that Last-Event-ID was included
const calls = (global.fetch as jest.Mock).mock.calls;
Expand Down
176 changes: 67 additions & 109 deletions src/client/streamableHttp.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { Transport } from "../shared/transport.js";
import { JSONRPCMessage, JSONRPCMessageSchema } from "../types.js";
import { auth, AuthResult, OAuthClientProvider, UnauthorizedError } from "./auth.js";
import { type ErrorEvent } from "eventsource";

import { EventSourceMessage, EventSourceParserStream } from 'eventsource-parser/stream';
export class StreamableHTTPError extends Error {
constructor(
public readonly code: number | undefined,
message: string | undefined,
public readonly event: ErrorEvent,
) {
super(`Streamable HTTP error: ${message}`);
}
Expand Down Expand Up @@ -45,7 +43,7 @@ export type StreamableHTTPClientTransportOptions = {
* for receiving messages.
*/
export class StreamableHTTPClientTransport implements Transport {
private _activeStreams: Map<string, ReadableStreamDefaultReader<Uint8Array>> = new Map();
private _activeStreams: Map<string, ReadableStreamDefaultReader<EventSourceMessage>> = new Map();
private _abortController?: AbortController;
private _url: URL;
private _requestInit?: RequestInit;
Expand Down Expand Up @@ -83,7 +81,7 @@ export class StreamableHTTPClientTransport implements Transport {
throw new UnauthorizedError();
}

return await this._startOrAuth();
return await this._startOrAuthStandaloneSSE();
}

private async _commonHeaders(): Promise<HeadersInit> {
Expand All @@ -102,7 +100,7 @@ export class StreamableHTTPClientTransport implements Transport {
return headers;
}

private async _startOrAuth(): Promise<void> {
private async _startOrAuthStandaloneSSE(): Promise<void> {
try {
// Try to open an initial SSE stream with GET to listen for server messages
// This is optional according to the spec - server may not support it
Expand All @@ -121,32 +119,76 @@ export class StreamableHTTPClientTransport implements Transport {
signal: this._abortController?.signal,
});

if (response.status === 405 || response.status === 404) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to handle 405s here, no?

// Server doesn't support GET for SSE, which is allowed by the spec
// We'll rely on SSE responses to POST requests for communication
return;
}

if (!response.ok) {
if (response.status === 401 && this._authProvider) {
// Need to authenticate
return await this._authThenStart();
}

const error = new Error(`Failed to open SSE stream: ${response.status} ${response.statusText}`);
const error = new StreamableHTTPError(
response.status,
`Failed to open SSE stream: ${response.statusText}`,
);
this.onerror?.(error);
throw error;
}

// Successful connection, handle the SSE stream as a standalone listener
const streamId = `initial-${Date.now()}`;
const streamId = `standalone-sse-${Date.now()}`;
this._handleSseStream(response.body, streamId);
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
}

private _handleSseStream(stream: ReadableStream<Uint8Array> | null, streamId: string): void {
if (!stream) {
return;
}

// Create a pipeline: binary stream -> text decoder -> SSE parser
const eventStream = stream
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream());

const reader = eventStream.getReader();
this._activeStreams.set(streamId, reader);

const processStream = async () => {
try {
while (true) {
const { done, value: event } = await reader.read();
if (done) {
this._activeStreams.delete(streamId);
break;
}

// Update last event ID if provided
if (event.id) {
this._lastEventId = event.id;
}

// Handle message events (default event type is undefined per docs)
// or explicit 'message' event type
if (!event.event || event.event === 'message') {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(event.data));
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
}
}
}
} catch (error) {
this._activeStreams.delete(streamId);
this.onerror?.(error as Error);
}
};

processStream();
}

async start() {
if (this._activeStreams.size > 0) {
throw new Error(
Expand All @@ -155,7 +197,6 @@ export class StreamableHTTPClientTransport implements Transport {
}

this._abortController = new AbortController();
return await this._startOrAuth();
}

/**
Expand Down Expand Up @@ -186,30 +227,6 @@ export class StreamableHTTPClientTransport implements Transport {
// Abort any pending requests
this._abortController?.abort();

// If we have a session ID, send a DELETE request to explicitly terminate the session
if (this._sessionId) {
try {
const commonHeaders = await this._commonHeaders();
const response = await fetch(this._url, {
method: "DELETE",
headers: commonHeaders,
signal: this._abortController?.signal,
});

if (!response.ok) {
// Server might respond with 405 if it doesn't support explicit session termination
// We don't throw an error in that case
if (response.status !== 405) {
const text = await response.text().catch(() => null);
throw new Error(`Error terminating session (HTTP ${response.status}): ${text}`);
}
}
} catch (error) {
// We still want to invoke onclose even if the session termination fails
this.onerror?.(error as Error);
}
}

this.onclose?.();
}

Expand Down Expand Up @@ -295,76 +312,17 @@ export class StreamableHTTPClientTransport implements Transport {
}
}

private _handleSseStream(stream: ReadableStream<Uint8Array> | null, streamId: string): void {
if (!stream) {
return;
/**
* Opens SSE stream to receive messages from the server.
*
* This allows the server to push messages to the client without requiring the client
* to first send a request via HTTP POST. Some servers may not support this feature.
* If authentication is required but fails, this method will throw an UnauthorizedError.
*/
async openSseStream(): Promise<void> {
if (!this._abortController) {
this._abortController = new AbortController();
}

// Set up stream handling for server-sent events
const reader = stream.getReader();
this._activeStreams.set(streamId, reader);
const decoder = new TextDecoder();
let buffer = '';

const processStream = async () => {
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
// Stream closed by server
this._activeStreams.delete(streamId);
break;
}

buffer += decoder.decode(value, { stream: true });

// Process SSE messages in the buffer
const events = buffer.split('\n\n');
buffer = events.pop() || '';

for (const event of events) {
const lines = event.split('\n');
let id: string | undefined;
let eventType: string | undefined;
let data: string | undefined;

// Parse SSE message according to the format
for (const line of lines) {
if (line.startsWith('id:')) {
id = line.slice(3).trim();
} else if (line.startsWith('event:')) {
eventType = line.slice(6).trim();
} else if (line.startsWith('data:')) {
data = line.slice(5).trim();
}
}

// Update last event ID if provided by server
// As per spec: the ID MUST be globally unique across all streams within that session
if (id) {
this._lastEventId = id;
}

// Handle message event
if (data) {
// Default event type is 'message' per SSE spec if not specified
if (!eventType || eventType === 'message') {
try {
const message = JSONRPCMessageSchema.parse(JSON.parse(data));
this.onmessage?.(message);
} catch (error) {
this.onerror?.(error as Error);
}
}
}
}
}
} catch (error) {
this._activeStreams.delete(streamId);
this.onerror?.(error as Error);
}
};

processStream();
await this._startOrAuthStandaloneSSE();
}
}