Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ModelContextProtocol/Client/McpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace ModelContextProtocol.Client;

/// <inheritdoc/>
internal sealed class McpClient : McpJsonRpcEndpoint, IMcpClient
internal sealed class McpClient : McpEndpoint, IMcpClient
{
private readonly IClientTransport _clientTransport;
private readonly McpClientOptions _options;
Expand Down
5 changes: 0 additions & 5 deletions src/ModelContextProtocol/Configuration/McpServerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ public record McpServerConfig
/// </summary>
public string? Location { get; set; }

/// <summary>
/// Arguments (if any) to pass to the executable.
/// </summary>
public string[]? Arguments { get; init; }

/// <summary>
/// Additional transport-specific configuration.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
}

await _outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message), cancellationToken);
// Emit redundant "event: message" lines for better compatibility with other SDKs.
await _outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message, "message"), cancellationToken);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/ModelContextProtocol/Server/McpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace ModelContextProtocol.Server;

/// <inheritdoc />
internal sealed class McpServer : McpJsonRpcEndpoint, IMcpServer
internal sealed class McpServer : McpEndpoint, IMcpServer
{
private readonly EventHandler? _toolsChangedDelegate;
private readonly EventHandler? _promptsChangedDelegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace ModelContextProtocol.Shared;
/// This is especially true as a client represents a connection to one and only one server, and vice versa.
/// Any multi-client or multi-server functionality should be implemented at a higher level of abstraction.
/// </summary>
internal abstract class McpJsonRpcEndpoint : IAsyncDisposable
internal abstract class McpEndpoint : IAsyncDisposable
{
private readonly RequestHandlers _requestHandlers = [];
private readonly NotificationHandlers _notificationHandlers = [];
Expand All @@ -29,10 +29,10 @@ internal abstract class McpJsonRpcEndpoint : IAsyncDisposable
protected readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="McpJsonRpcEndpoint"/> class.
/// Initializes a new instance of the <see cref="McpEndpoint"/> class.
/// </summary>
/// <param name="loggerFactory">The logger factory.</param>
protected McpJsonRpcEndpoint(ILoggerFactory? loggerFactory = null)
protected McpEndpoint(ILoggerFactory? loggerFactory = null)
{
_logger = loggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance;
}
Expand All @@ -57,7 +57,7 @@ public Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancella
/// <summary>
/// Task that processes incoming messages from the transport.
/// </summary>
protected Task? MessageProcessingTask { get; set; }
protected Task? MessageProcessingTask { get; private set; }

[MemberNotNull(nameof(MessageProcessingTask))]
protected void StartSession(ITransport sessionTransport)
Expand Down
4 changes: 2 additions & 2 deletions src/ModelContextProtocol/Shared/McpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal sealed class McpSession : IDisposable
private readonly ConcurrentDictionary<RequestId, CancellationTokenSource> _handlingRequests = new();
private readonly JsonSerializerOptions _jsonOptions;
private readonly ILogger _logger;

private readonly string _id = Guid.NewGuid().ToString("N");
private long _nextRequestId;

Expand Down Expand Up @@ -371,7 +371,7 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca
default:
return JsonSerializer.Deserialize(
JsonSerializer.Serialize(notificationParams, McpJsonUtilities.DefaultOptions.GetTypeInfo<object?>()),
McpJsonUtilities.DefaultOptions.GetTypeInfo<CancelledNotification>());
McpJsonUtilities.DefaultOptions.GetTypeInfo<CancelledNotification>());
}
}
catch
Expand Down
50 changes: 50 additions & 0 deletions tests/ModelContextProtocol.Tests/SseServerIntegrationTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using ModelContextProtocol.Client;
using ModelContextProtocol.Protocol.Types;
using ModelContextProtocol.Tests.Utils;
using System.Net;
using System.Text;

namespace ModelContextProtocol.Tests;

Expand Down Expand Up @@ -30,6 +32,12 @@ private Task<IMcpClient> GetClientAsync(McpClientOptions? options = null)
cancellationToken: TestContext.Current.CancellationToken);
}

private HttpClient GetHttpClient() =>
new()
{
BaseAddress = new(_fixture.DefaultConfig.Location!),
};

[Fact]
public async Task ConnectAndPing_Sse_TestServer()
{
Expand Down Expand Up @@ -271,4 +279,46 @@ public async Task CallTool_Sse_EchoServer_Concurrently()
Assert.Equal($"Echo: Hello MCP! {i}", textContent.Text);
}
}

[Fact]
public async Task EventSourceStream_Includes_MessageEventType()
{
// Simulate our own MCP client handshake using a plain HttpClient so we can look for "event: message"
// in the raw SSE response stream which is not exposed by the real MCP client.
using var httpClient = GetHttpClient();
await using var sseResponse = await httpClient.GetStreamAsync("", TestContext.Current.CancellationToken);
using var streamReader = new StreamReader(sseResponse);

// read event stream from the server
var endpointEvent = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
Assert.Equal("event: endpoint", endpointEvent);

var endpointData = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
Assert.NotNull(endpointData);
Assert.StartsWith("data: ", endpointData);
var messageEndpoint = endpointData["data: ".Length..];

const string initializeRequest = """
{"jsonrpc":"2.0","id":"1","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"IntegrationTestClient","version":"1.0.0"}}}
""";
using (var initializeRequestBody = new StringContent(initializeRequest, Encoding.UTF8, "application/json"))
{
var response = await httpClient.PostAsync(messageEndpoint, initializeRequestBody, TestContext.Current.CancellationToken);
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
}

const string initializeNotification = """
{"jsonrpc":"2.0","method":"notifications/initialized"}
""";
using (var initializeNotificationBody = new StringContent(initializeNotification, Encoding.UTF8, "application/json"))
{
var response = await httpClient.PostAsync(messageEndpoint, initializeNotificationBody, TestContext.Current.CancellationToken);
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
}

// read event stream from the server
Assert.Equal("", await streamReader.ReadLineAsync(TestContext.Current.CancellationToken));
var messageEvent = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
Assert.Equal("event: message", messageEvent);
}
}