Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions example/streamable_https/test_keepalive_client.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';

void main() async {
print('MCP Keep-Alive Test Client');
print('This client will connect to the server and monitor SSE keep-alive messages\n');

try {
// First, initialize the session
print('1. Initializing session...');
final initClient = HttpClient();
final initRequest = await initClient.postUrl(Uri.parse('http://localhost:3001/mcp'));

initRequest.headers.set('Content-Type', 'application/json');
initRequest.headers.set('Accept', 'application/json, text/event-stream');

initRequest.write(jsonEncode({
'jsonrpc': '2.0',
'method': 'initialize',
'params': {
'protocolVersion': '0.1.0',
'capabilities': {
'roots': {'listChanged': true},
'sampling': {},
},
'clientInfo': {
'name': 'test-client',
'version': '1.0.0',
},
},
'id': 1,
}));

final initResponse = await initRequest.close();
final sessionId = initResponse.headers.value('mcp-session-id');

if (sessionId == null) {
print('ERROR: No session ID received');
return;
}

print('Session initialized with ID: $sessionId');

// Read the SSE stream
final responseBody = StringBuffer();
await for (final chunk in initResponse.transform(utf8.decoder)) {
responseBody.write(chunk);
// Print each SSE event as it arrives
final lines = chunk.split('\n');
for (final line in lines) {
if (line.trim().isNotEmpty) {
print('SSE: $line');
}
}
}

// Now establish SSE connection
print('\n2. Establishing SSE connection to monitor keep-alive messages...');
final sseClient = HttpClient();
final sseRequest = await sseClient.getUrl(Uri.parse('http://localhost:3001/mcp'));

sseRequest.headers.set('Accept', 'text/event-stream');
sseRequest.headers.set('mcp-session-id', sessionId);

final sseResponse = await sseRequest.close();

if (sseResponse.statusCode != 200) {
print('ERROR: SSE connection failed with status ${sseResponse.statusCode}');
return;
}

print('SSE connection established, monitoring for keep-alive messages...');
print('(Keep-alive messages should appear every 5 seconds)\n');

// Create a timer to track time
final startTime = DateTime.now();
Timer.periodic(Duration(seconds: 1), (timer) {
final elapsed = DateTime.now().difference(startTime).inSeconds;
stdout.write('\rElapsed time: ${elapsed}s');
});

// Monitor the SSE stream
int keepAliveCount = 0;
await for (final chunk in sseResponse.transform(utf8.decoder)) {
final lines = chunk.split('\n');
for (final line in lines) {
if (line.startsWith(':')) {
// This is a comment/keep-alive message
keepAliveCount++;
print('\n[Keep-Alive #$keepAliveCount] $line');
stdout.write('Elapsed time: ${DateTime.now().difference(startTime).inSeconds}s');
} else if (line.trim().isNotEmpty) {
// Other SSE messages
print('\n[SSE Event] $line');
}
}
}

} catch (e) {
print('Error: $e');
}
}
112 changes: 112 additions & 0 deletions example/streamable_https/test_keepalive_server.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:mcp_dart/mcp_dart.dart';

void main() async {
// Create HTTP server
final server = await HttpServer.bind(InternetAddress.anyIPv4, 3001);
print('MCP Keep-Alive Test Server listening on port 3001');
print('Keep-alive messages will be sent every 5 seconds');

// Create the MCP server
final mcpServer = McpServer(
Implementation(name: 'keepalive-test-server', version: '1.0.0'),
);

// Register a simple tool
mcpServer.tool(
'test-keepalive',
description: 'A tool that waits to test keep-alive',
toolInputSchema: ToolInputSchema(
properties: {
'delay': {
'type': 'number',
'description': 'Delay in seconds before responding',
'default': 10,
},
},
),
callback: ({args, extra}) async {
final delay = (args?['delay'] as num? ?? 10).toInt();
print('Tool called, waiting $delay seconds...');

// Send periodic notifications during the wait
for (int i = 0; i < delay; i++) {
await extra?.sendNotification(JsonRpcLoggingMessageNotification(
logParams: LoggingMessageNotificationParams(
level: LoggingLevel.info,
data: 'Waiting... ${i + 1}/$delay seconds',
),
));
await Future.delayed(Duration(seconds: 1));
}

return CallToolResult.fromContent(
content: [
TextContent(text: 'Completed after $delay seconds!'),
],
);
},
);

await for (final request in server) {
// Set CORS headers
request.response.headers.set('Access-Control-Allow-Origin', '*');
request.response.headers.set('Access-Control-Allow-Methods', 'GET, POST, DELETE, OPTIONS');
request.response.headers.set('Access-Control-Allow-Headers',
'Origin, Content-Type, Accept, mcp-session-id');
request.response.headers.set('Access-Control-Expose-Headers', 'mcp-session-id');

if (request.method == 'OPTIONS') {
request.response.statusCode = HttpStatus.ok;
await request.response.close();
continue;
}

if (request.uri.path != '/mcp') {
request.response
..statusCode = HttpStatus.notFound
..write('Not Found')
..close();
continue;
}

try {
// Create transport with keep-alive enabled (5 seconds for testing)
final transport = StreamableHTTPServerTransport(
options: StreamableHTTPServerTransportOptions(
sessionIdGenerator: () => generateUUID(),
keepAliveInterval: 5, // Send keep-alive every 5 seconds
enableJsonResponse: false, // Use SSE
),
);

// Connect transport to MCP server
await mcpServer.connect(transport);

// Log transport events
transport.onclose = () {
print('Transport closed');
};

transport.onerror = (error) {
print('Transport error: $error');
};

// Handle the request
await transport.handleRequest(request);

print('Request handled, SSE stream should be active with keep-alive');
} catch (error) {
print('Error handling request: $error');
if (!request.response.headers.contentType.toString().contains('event-stream')) {
request.response
..statusCode = HttpStatus.internalServerError
..write('Internal Server Error')
..close();
}
}
}
}
83 changes: 80 additions & 3 deletions lib/src/server/streamable_https.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';

import 'package:mcp_dart/src/shared/uuid.dart';

import '../shared/uuid.dart';
import '../shared/transport.dart';
import '../types.dart';

Expand Down Expand Up @@ -60,12 +59,18 @@ class StreamableHTTPServerTransportOptions {
/// If provided, resumability will be enabled, allowing clients to reconnect and resume messages
final EventStore? eventStore;

/// Interval in seconds for sending SSE keep-alive messages.
/// Set to null to disable keep-alive messages.
/// Default is 25 seconds (recommended to prevent client timeouts).
final int? keepAliveInterval;

/// Creates configuration options for StreamableHTTPServerTransport
StreamableHTTPServerTransportOptions({
this.sessionIdGenerator,
this.onsessioninitialized,
this.enableJsonResponse = false,
this.eventStore,
this.keepAliveInterval = 25,
});
}

Expand Down Expand Up @@ -120,6 +125,8 @@ class StreamableHTTPServerTransport implements Transport {
final String _standaloneSseStreamId = '_GET_stream';
final EventStore? _eventStore;
final void Function(String sessionId)? _onsessioninitialized;
final int? _keepAliveInterval;
final Map<String, Timer> _keepAliveTimers = {};

@override
String? sessionId;
Expand All @@ -139,7 +146,8 @@ class StreamableHTTPServerTransport implements Transport {
}) : _sessionIdGenerator = options.sessionIdGenerator,
_enableJsonResponse = options.enableJsonResponse,
_eventStore = options.eventStore,
_onsessioninitialized = options.onsessioninitialized;
_onsessioninitialized = options.onsessioninitialized,
_keepAliveInterval = options.keepAliveInterval;

/// Starts the transport. This is required by the Transport interface but is a no-op
/// for the Streamable HTTP transport as connections are managed per-request.
Expand Down Expand Up @@ -241,9 +249,13 @@ class StreamableHTTPServerTransport implements Transport {
// Assign the response to the standalone SSE stream
_streamMapping[_standaloneSseStreamId] = req.response;

// Start keep-alive timer for this SSE connection
_startKeepAliveTimer(_standaloneSseStreamId, req.response);

// Set up close handler for client disconnects
req.response.done.then((_) {
_streamMapping.remove(_standaloneSseStreamId);
_stopKeepAliveTimer(_standaloneSseStreamId);
});
}

Expand Down Expand Up @@ -282,6 +294,15 @@ class StreamableHTTPServerTransport implements Transport {
);

_streamMapping[streamId] = res;

// Start keep-alive timer for this resumed SSE connection
_startKeepAliveTimer(streamId, res);

// Set up close handler for client disconnects
res.done.then((_) {
_streamMapping.remove(streamId);
_stopKeepAliveTimer(streamId);
});
} catch (error) {
onerror?.call(error is Error ? error : StateError(error.toString()));
}
Expand All @@ -306,6 +327,50 @@ class StreamableHTTPServerTransport implements Transport {
}
}

/// Writes a keep-alive comment to the SSE stream
bool _writeKeepAlive(HttpResponse res) {
try {
// SSE comment format - lines starting with ':' are ignored by clients
final timestamp = DateTime.now().toUtc().toIso8601String();
res.write(': keep-alive $timestamp\n\n');
res.flush();
return true;
} catch (e) {
// Connection closed, timer will be cleaned up
return false;
}
}

/// Starts a keep-alive timer for the given stream
void _startKeepAliveTimer(String streamId, HttpResponse response) {
// Only start timer if keep-alive is enabled
final keepAliveInterval = _keepAliveInterval;
if (keepAliveInterval == null || keepAliveInterval <= 0) {
return;
}

// Cancel any existing timer for this stream
_keepAliveTimers[streamId]?.cancel();

// Create new timer
_keepAliveTimers[streamId] = Timer.periodic(
Duration(seconds: keepAliveInterval),
(timer) {
if (!_writeKeepAlive(response)) {
// Connection closed, cancel timer
timer.cancel();
_keepAliveTimers.remove(streamId);
}
},
);
}

/// Stops the keep-alive timer for the given stream
void _stopKeepAliveTimer(String streamId) {
_keepAliveTimers[streamId]?.cancel();
_keepAliveTimers.remove(streamId);
}

/// Handles unsupported requests (PUT, PATCH, etc.)
Future<void> _handleUnsupportedRequest(HttpResponse res) async {
res.statusCode = HttpStatus.methodNotAllowed;
Expand Down Expand Up @@ -503,9 +568,15 @@ class StreamableHTTPServerTransport implements Transport {
}
}

// Start keep-alive timer for SSE streams only
if (!_enableJsonResponse) {
_startKeepAliveTimer(streamId, req.response);
}

// Set up close handler for client disconnects
req.response.done.then((_) {
_streamMapping.remove(streamId);
_stopKeepAliveTimer(streamId);
});

// Handle each message
Expand Down Expand Up @@ -618,6 +689,12 @@ class StreamableHTTPServerTransport implements Transport {

@override
Future<void> close() async {
// Cancel all keep-alive timers
for (final timer in _keepAliveTimers.values) {
timer.cancel();
}
_keepAliveTimers.clear();

// Close all SSE connections - fix concurrent modification by creating a copy of the values first
final responses = List<HttpResponse>.from(_streamMapping.values);
for (final response in responses) {
Expand Down