Skip to content

Commit fb07d66

Browse files
Convert EverythingServer to use Streamable HTTP (#709)
* Convert EverythingServer to use Streamable HTTP * Apply suggestion from @halter73 Co-authored-by: Stephen Halter <[email protected]> --------- Co-authored-by: Stephen Halter <[email protected]>
1 parent a918960 commit fb07d66

File tree

8 files changed

+177
-37
lines changed

8 files changed

+177
-37
lines changed
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk.Web">
22

33
<PropertyGroup>
44
<TargetFramework>net9.0</TargetFramework>
@@ -8,14 +8,13 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11-
<PackageReference Include="Microsoft.Extensions.Hosting" />
1211
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" />
1312
<PackageReference Include="OpenTelemetry.Extensions.Hosting" />
1413
<PackageReference Include="OpenTelemetry.Instrumentation.Http" />
1514
</ItemGroup>
1615

1716
<ItemGroup>
18-
<ProjectReference Include="..\..\src\ModelContextProtocol\ModelContextProtocol.csproj" />
17+
<ProjectReference Include="..\..\src\ModelContextProtocol.AspNetCore\ModelContextProtocol.AspNetCore.csproj" />
1918
</ItemGroup>
2019

2120
</Project>
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
@HostAddress = http://localhost:3001
2+
3+
POST {{HostAddress}}/
4+
Accept: application/json, text/event-stream
5+
Content-Type: application/json
6+
7+
{
8+
"jsonrpc": "2.0",
9+
"id": 1,
10+
"method": "initialize",
11+
"params": {
12+
"clientInfo": {
13+
"name": "RestClient",
14+
"version": "0.1.0"
15+
},
16+
"capabilities": {},
17+
"protocolVersion": "2025-06-18"
18+
}
19+
}
20+
21+
###
22+
23+
@SessionId = ZwwM0VFEtKNOMBsP8D2VzQ
24+
25+
POST {{HostAddress}}/
26+
Accept: application/json, text/event-stream
27+
Content-Type: application/json
28+
MCP-Protocol-Version: 2025-06-18
29+
Mcp-Session-Id: {{SessionId}}
30+
31+
{
32+
"jsonrpc": "2.0",
33+
"id": 2,
34+
"method": "resources/list"
35+
}
36+
37+
###
38+
39+
@resource_uri = test://direct/text/resource
40+
41+
POST {{HostAddress}}/
42+
Accept: application/json, text/event-stream
43+
Content-Type: application/json
44+
MCP-Protocol-Version: 2025-06-18
45+
Mcp-Session-Id: {{SessionId}}
46+
47+
{
48+
"jsonrpc": "2.0",
49+
"id": 3,
50+
"method": "resources/subscribe",
51+
"params": {
52+
"uri": "{{resource_uri}}"
53+
}
54+
}
55+
56+
###
57+
58+
POST {{HostAddress}}/
59+
Accept: application/json, text/event-stream
60+
Content-Type: application/json
61+
MCP-Protocol-Version: 2025-06-18
62+
Mcp-Session-Id: {{SessionId}}
63+
64+
{
65+
"jsonrpc": "2.0",
66+
"id": 4,
67+
"method": "resources/unsubscribe",
68+
"params": {
69+
"uri": "{{resource_uri}}"
70+
}
71+
}
72+
73+
###
74+
75+
DELETE {{HostAddress}}/
76+
MCP-Protocol-Version: 2025-06-18
77+
Mcp-Session-Id: {{SessionId}}

samples/EverythingServer/LoggingUpdateMessageSender.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
namespace EverythingServer;
77

8-
public class LoggingUpdateMessageSender(McpServer server, Func<LoggingLevel> getMinLevel) : BackgroundService
8+
public class LoggingUpdateMessageSender(McpServer server) : BackgroundService
99
{
1010
readonly Dictionary<LoggingLevel, string> _loggingLevelMap = new()
1111
{
@@ -23,15 +23,15 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
2323
{
2424
while (!stoppingToken.IsCancellationRequested)
2525
{
26-
var newLevel = (LoggingLevel)Random.Shared.Next(_loggingLevelMap.Count);
26+
var msgLevel = (LoggingLevel)Random.Shared.Next(_loggingLevelMap.Count);
2727

2828
var message = new
2929
{
30-
Level = newLevel.ToString().ToLower(),
31-
Data = _loggingLevelMap[newLevel],
30+
Level = msgLevel.ToString().ToLower(),
31+
Data = _loggingLevelMap[msgLevel],
3232
};
3333

34-
if (newLevel > getMinLevel())
34+
if (msgLevel > server.LoggingLevel)
3535
{
3636
await server.SendNotificationAsync("notifications/message", message, cancellationToken: stoppingToken);
3737
}

samples/EverythingServer/Program.cs

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
using EverythingServer.Resources;
44
using EverythingServer.Tools;
55
using Microsoft.Extensions.AI;
6-
using Microsoft.Extensions.DependencyInjection;
7-
using Microsoft.Extensions.Hosting;
8-
using Microsoft.Extensions.Logging;
96
using ModelContextProtocol;
107
using ModelContextProtocol.Protocol;
118
using ModelContextProtocol.Server;
@@ -14,20 +11,46 @@
1411
using OpenTelemetry.Metrics;
1512
using OpenTelemetry.Resources;
1613
using OpenTelemetry.Trace;
14+
using System.Collections.Concurrent;
1715

18-
var builder = Host.CreateApplicationBuilder(args);
19-
builder.Logging.AddConsole(consoleLogOptions =>
20-
{
21-
// Configure all logs to go to stderr
22-
consoleLogOptions.LogToStandardErrorThreshold = LogLevel.Trace;
23-
});
16+
var builder = WebApplication.CreateBuilder(args);
2417

25-
HashSet<string> subscriptions = [];
26-
var _minimumLoggingLevel = LoggingLevel.Debug;
18+
// Dictionary of session IDs to a set of resource URIs they are subscribed to
19+
// The value is a ConcurrentDictionary used as a thread-safe HashSet
20+
// because .NET does not have a built-in concurrent HashSet
21+
ConcurrentDictionary<string, ConcurrentDictionary<string, byte>> subscriptions = new();
2722

2823
builder.Services
2924
.AddMcpServer()
30-
.WithStdioServerTransport()
25+
.WithHttpTransport(options =>
26+
{
27+
// Add a RunSessionHandler to remove all subscriptions for the session when it ends
28+
options.RunSessionHandler = async (httpContext, mcpServer, token) =>
29+
{
30+
if (mcpServer.SessionId == null)
31+
{
32+
// There is no sessionId if the serverOptions.Stateless is true
33+
await mcpServer.RunAsync(token);
34+
return;
35+
}
36+
try
37+
{
38+
subscriptions[mcpServer.SessionId] = new ConcurrentDictionary<string, byte>();
39+
// Start an instance of SubscriptionMessageSender for this session
40+
using var subscriptionSender = new SubscriptionMessageSender(mcpServer, subscriptions[mcpServer.SessionId]);
41+
await subscriptionSender.StartAsync(token);
42+
// Start an instance of LoggingUpdateMessageSender for this session
43+
using var loggingSender = new LoggingUpdateMessageSender(mcpServer);
44+
await loggingSender.StartAsync(token);
45+
await mcpServer.RunAsync(token);
46+
}
47+
finally
48+
{
49+
// This code runs when the session ends
50+
subscriptions.TryRemove(mcpServer.SessionId, out _);
51+
}
52+
};
53+
})
3154
.WithTools<AddTool>()
3255
.WithTools<AnnotatedMessageTool>()
3356
.WithTools<EchoTool>()
@@ -40,11 +63,13 @@
4063
.WithResources<SimpleResourceType>()
4164
.WithSubscribeToResourcesHandler(async (ctx, ct) =>
4265
{
43-
var uri = ctx.Params?.Uri;
44-
45-
if (uri is not null)
66+
if (ctx.Server.SessionId == null)
4667
{
47-
subscriptions.Add(uri);
68+
throw new McpException("Cannot add subscription for server with null SessionId");
69+
}
70+
if (ctx.Params?.Uri is { } uri)
71+
{
72+
subscriptions[ctx.Server.SessionId].TryAdd(uri, 0);
4873

4974
await ctx.Server.SampleAsync([
5075
new ChatMessage(ChatRole.System, "You are a helpful test server"),
@@ -62,10 +87,13 @@ await ctx.Server.SampleAsync([
6287
})
6388
.WithUnsubscribeFromResourcesHandler(async (ctx, ct) =>
6489
{
65-
var uri = ctx.Params?.Uri;
66-
if (uri is not null)
90+
if (ctx.Server.SessionId == null)
91+
{
92+
throw new McpException("Cannot remove subscription for server with null SessionId");
93+
}
94+
if (ctx.Params?.Uri is { } uri)
6795
{
68-
subscriptions.Remove(uri);
96+
subscriptions[ctx.Server.SessionId].TryRemove(uri, out _);
6997
}
7098
return new EmptyResult();
7199
})
@@ -126,13 +154,13 @@ await ctx.Server.SampleAsync([
126154
throw new McpProtocolException("Missing required argument 'level'", McpErrorCode.InvalidParams);
127155
}
128156

129-
_minimumLoggingLevel = ctx.Params.Level;
157+
// The SDK updates the LoggingLevel field of the IMcpServer
130158

131159
await ctx.Server.SendNotificationAsync("notifications/message", new
132160
{
133161
Level = "debug",
134162
Logger = "test-server",
135-
Data = $"Logging level set to {_minimumLoggingLevel}",
163+
Data = $"Logging level set to {ctx.Params.Level}",
136164
}, cancellationToken: ct);
137165

138166
return new EmptyResult();
@@ -145,10 +173,8 @@ await ctx.Server.SampleAsync([
145173
.WithLogging(b => b.SetResourceBuilder(resource))
146174
.UseOtlpExporter();
147175

148-
builder.Services.AddSingleton(subscriptions);
149-
builder.Services.AddHostedService<SubscriptionMessageSender>();
150-
builder.Services.AddHostedService<LoggingUpdateMessageSender>();
176+
var app = builder.Build();
151177

152-
builder.Services.AddSingleton<Func<LoggingLevel>>(_ => () => _minimumLoggingLevel);
178+
app.MapMcp();
153179

154-
await builder.Build().RunAsync();
180+
app.Run();
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"$schema": "https://json.schemastore.org/launchsettings.json",
3+
"profiles": {
4+
"http": {
5+
"commandName": "Project",
6+
"dotnetRunMessages": true,
7+
"applicationUrl": "http://localhost:3001",
8+
"environmentVariables": {
9+
"ASPNETCORE_ENVIRONMENT": "Development",
10+
}
11+
},
12+
"https": {
13+
"commandName": "Project",
14+
"dotnetRunMessages": true,
15+
"applicationUrl": "https://localhost:7133;http://localhost:3001",
16+
"environmentVariables": {
17+
"ASPNETCORE_ENVIRONMENT": "Development",
18+
}
19+
}
20+
}
21+
}

samples/EverythingServer/SubscriptionMessageSender.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
using Microsoft.Extensions.Hosting;
1+
using System.Collections.Concurrent;
22
using ModelContextProtocol;
33
using ModelContextProtocol.Server;
44

5-
internal class SubscriptionMessageSender(McpServer server, HashSet<string> subscriptions) : BackgroundService
5+
internal class SubscriptionMessageSender(McpServer server, ConcurrentDictionary<string, byte> subscriptions) : BackgroundService
66
{
77
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
88
{
99
while (!stoppingToken.IsCancellationRequested)
1010
{
11-
foreach (var uri in subscriptions)
11+
foreach (var uri in subscriptions.Keys)
1212
{
1313
await server.SendNotificationAsync("notifications/resource/updated",
1414
new
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Information",
5+
"Microsoft.AspNetCore": "Warning"
6+
}
7+
}
8+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"Logging": {
3+
"LogLevel": {
4+
"Default": "Information",
5+
"Microsoft.AspNetCore": "Warning"
6+
}
7+
},
8+
"AllowedHosts": "*"
9+
}

0 commit comments

Comments
 (0)