Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 70 additions & 5 deletions src/server/streamableHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,15 @@ export class StreamableHTTPServerTransport implements Transport {
res.on("close", () => {
this._streamMapping.delete(this._standaloneSseStreamId);
});

// Add error handler for standalone SSE stream
res.on("error", (error) => {
if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') {
this._streamMapping.delete(this._standaloneSseStreamId);
} else {
this.onerror?.(error as Error);
}
});
}

/**
Expand All @@ -327,13 +336,27 @@ export class StreamableHTTPServerTransport implements Transport {

const streamId = await this._eventStore?.replayEventsAfter(lastEventId, {
send: async (eventId: string, message: JSONRPCMessage) => {
// Check stream state before writing
if (res.destroyed || res.writableEnded || !res.writable) {
res.end();
return;
}
if (!this.writeSSEEvent(res, message, eventId)) {
this.onerror?.(new Error("Failed replay events"));
res.end();
}
}
});
this._streamMapping.set(streamId, res);

// Add error handler for replay stream
res.on("error", (error) => {
if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') {
this._streamMapping.delete(streamId);
} else {
this.onerror?.(error as Error);
}
});
} catch (error) {
this.onerror?.(error as Error);
}
Expand All @@ -343,14 +366,26 @@ export class StreamableHTTPServerTransport implements Transport {
* Writes an event to the SSE stream with proper formatting
*/
private writeSSEEvent(res: ServerResponse, message: JSONRPCMessage, eventId?: string): boolean {
// Check if stream is still writable to prevent ERR_STREAM_WRITE_AFTER_END
if (res.destroyed || res.writableEnded || !res.writable) {
return false;
}

let eventData = `event: message\n`;
// Include event ID if provided - this is important for resumability
if (eventId) {
eventData += `id: ${eventId}\n`;
}
eventData += `data: ${JSON.stringify(message)}\n\n`;

return res.write(eventData);
try {
return res.write(eventData);
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') {
return false;
}
throw error;
}
}

/**
Expand Down Expand Up @@ -509,17 +544,32 @@ export class StreamableHTTPServerTransport implements Transport {
}
// Store the response for this request to send messages back through this connection
// We need to track by request ID to maintain the connection
const requestIds: RequestId[] = [];
for (const message of messages) {
if (isJSONRPCRequest(message)) {
this._streamMapping.set(streamId, res);
this._requestToStreamMapping.set(message.id, streamId);
requestIds.push(message.id);
}
}
// Set up close handler for client disconnects
res.on("close", () => {
this._streamMapping.delete(streamId);
});

// Add error handler for stream write errors
res.on("error", (error) => {
if ((error as NodeJS.ErrnoException).code === 'ERR_STREAM_WRITE_AFTER_END') {
this._streamMapping.delete(streamId);
// Clean up all request mappings for this stream
for (const reqId of requestIds) {
this._requestToStreamMapping.delete(reqId);
}
} else {
this.onerror?.(error as Error);
}
});

// handle each message
for (const message of messages) {
this.onmessage?.(message, { authInfo, requestInfo });
Expand Down Expand Up @@ -681,7 +731,11 @@ export class StreamableHTTPServerTransport implements Transport {
}

// Send the message to the standalone SSE stream
this.writeSSEEvent(standaloneSse, message, eventId);
const writeSuccess = this.writeSSEEvent(standaloneSse, message, eventId);
if (!writeSuccess) {
// Clean up if write failed
this._streamMapping.delete(this._standaloneSseStreamId);
}
return;
}

Expand All @@ -692,16 +746,27 @@ export class StreamableHTTPServerTransport implements Transport {
throw new Error(`No connection established for request ID: ${String(requestId)}`);
}

// Check if response stream is still valid and writable
if (!response || response.destroyed || response.writableEnded || !response.writable) {
// Clean up mappings for ended streams
this._streamMapping.delete(streamId);
this._requestToStreamMapping.delete(requestId);
return;
}

if (!this._enableJsonResponse) {
// For SSE responses, generate event ID if event store is provided
let eventId: string | undefined;

if (this._eventStore) {
eventId = await this._eventStore.storeEvent(streamId, message);
}
if (response) {
// Write the event to the response stream
this.writeSSEEvent(response, message, eventId);
// Write the event to the response stream (now safe due to validation above)
const writeSuccess = this.writeSSEEvent(response, message, eventId);
if (!writeSuccess) {
// Clean up if write failed
this._streamMapping.delete(streamId);
this._requestToStreamMapping.delete(requestId);
}
}

Expand Down
Loading