Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ModelContextProtocol/Client/McpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace ModelContextProtocol.Client;

/// <inheritdoc/>
internal sealed class McpClient : McpJsonRpcEndpoint, IMcpClient
internal sealed class McpClient : McpEndpoint, IMcpClient
{
private readonly IClientTransport _clientTransport;
private readonly McpClientOptions _options;
Expand Down
5 changes: 0 additions & 5 deletions src/ModelContextProtocol/Configuration/McpServerConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@ public record McpServerConfig
/// </summary>
public string? Location { get; set; }

/// <summary>
/// Arguments (if any) to pass to the executable.
/// </summary>
public string[]? Arguments { get; init; }

/// <summary>
/// Additional transport-specific configuration.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ public async Task SendMessageAsync(IJsonRpcMessage message, CancellationToken ca
throw new InvalidOperationException($"Transport is not connected. Make sure to call {nameof(RunAsync)} first.");
}

await _outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message), cancellationToken);
// Emit redundant "event: message" lines for better compatibility with other SDKs.
await _outgoingSseChannel.Writer.WriteAsync(new SseItem<IJsonRpcMessage?>(message, SseParser.EventTypeDefault), cancellationToken);
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/ModelContextProtocol/Server/McpServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
namespace ModelContextProtocol.Server;

/// <inheritdoc />
internal sealed class McpServer : McpJsonRpcEndpoint, IMcpServer
internal sealed class McpServer : McpEndpoint, IMcpServer
{
private readonly EventHandler? _toolsChangedDelegate;
private readonly EventHandler? _promptsChangedDelegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace ModelContextProtocol.Shared;
/// This is especially true as a client represents a connection to one and only one server, and vice versa.
/// Any multi-client or multi-server functionality should be implemented at a higher level of abstraction.
/// </summary>
internal abstract class McpJsonRpcEndpoint : IAsyncDisposable
internal abstract class McpEndpoint : IAsyncDisposable
{
private readonly RequestHandlers _requestHandlers = [];
private readonly NotificationHandlers _notificationHandlers = [];
Expand All @@ -31,10 +31,10 @@ internal abstract class McpJsonRpcEndpoint : IAsyncDisposable
protected readonly ILogger _logger;

/// <summary>
/// Initializes a new instance of the <see cref="McpJsonRpcEndpoint"/> class.
/// Initializes a new instance of the <see cref="McpEndpoint"/> class.
/// </summary>
/// <param name="loggerFactory">The logger factory.</param>
protected McpJsonRpcEndpoint(ILoggerFactory? loggerFactory = null)
protected McpEndpoint(ILoggerFactory? loggerFactory = null)
{
_logger = loggerFactory?.CreateLogger(GetType()) ?? NullLogger.Instance;
}
Expand Down Expand Up @@ -64,7 +64,7 @@ public Task SendMessageAsync(IJsonRpcMessage message, CancellationToken cancella
/// <summary>
/// Task that processes incoming messages from the transport.
/// </summary>
protected Task? MessageProcessingTask { get; set; }
protected Task? MessageProcessingTask { get; private set; }

[MemberNotNull(nameof(MessageProcessingTask))]
protected void StartSession(ITransport sessionTransport)
Expand Down
2 changes: 1 addition & 1 deletion src/ModelContextProtocol/Shared/McpSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal sealed class McpSession : IDisposable
/// </summary>
private readonly ConcurrentDictionary<RequestId, CancellationTokenSource> _handlingRequests = new();
private readonly ILogger _logger;

private readonly string _id = Guid.NewGuid().ToString("N");
private long _nextRequestId;

Expand Down
48 changes: 48 additions & 0 deletions tests/ModelContextProtocol.Tests/SseServerIntegrationTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using ModelContextProtocol.Client;
using ModelContextProtocol.Protocol.Types;
using ModelContextProtocol.Tests.Utils;
using System.Net;
using System.Text;

namespace ModelContextProtocol.Tests;

Expand Down Expand Up @@ -30,6 +32,12 @@ private Task<IMcpClient> GetClientAsync(McpClientOptions? options = null)
cancellationToken: TestContext.Current.CancellationToken);
}

private HttpClient GetHttpClient() =>
new()
{
BaseAddress = new(_fixture.DefaultConfig.Location!),
};

[Fact]
public async Task ConnectAndPing_Sse_TestServer()
{
Expand Down Expand Up @@ -271,4 +279,44 @@ public async Task CallTool_Sse_EchoServer_Concurrently()
Assert.Equal($"Echo: Hello MCP! {i}", textContent.Text);
}
}

[Fact]
public async Task EventSourceStream_Includes_MessageEventType()
{
// Simulate our own MCP client handshake using a plain HttpClient so we can look for "event: message"
// in the raw SSE response stream which is not exposed by the real MCP client.
using var httpClient = GetHttpClient();
await using var sseResponse = await httpClient.GetStreamAsync("", TestContext.Current.CancellationToken);
using var streamReader = new StreamReader(sseResponse);

var endpointEvent = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
Assert.Equal("event: endpoint", endpointEvent);

var endpointData = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
Assert.NotNull(endpointData);
Assert.StartsWith("data: ", endpointData);
var messageEndpoint = endpointData["data: ".Length..];

const string initializeRequest = """
{"jsonrpc":"2.0","id":"1","method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"IntegrationTestClient","version":"1.0.0"}}}
""";
using (var initializeRequestBody = new StringContent(initializeRequest, Encoding.UTF8, "application/json"))
{
var response = await httpClient.PostAsync(messageEndpoint, initializeRequestBody, TestContext.Current.CancellationToken);
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
}

const string initializedNotification = """
{"jsonrpc":"2.0","method":"notifications/initialized"}
""";
using (var initializedNotificationBody = new StringContent(initializedNotification, Encoding.UTF8, "application/json"))
{
var response = await httpClient.PostAsync(messageEndpoint, initializedNotificationBody, TestContext.Current.CancellationToken);
Assert.Equal(HttpStatusCode.Accepted, response.StatusCode);
}

Assert.Equal("", await streamReader.ReadLineAsync(TestContext.Current.CancellationToken));
var messageEvent = await streamReader.ReadLineAsync(TestContext.Current.CancellationToken);
Assert.Equal("event: message", messageEvent);
}
}