Stormancer Node 2 Node TCP Transport
- Interlaced requests and processing
- Constent string remote operation ids
- System.IO.Pipelines based request and response streaming
- Request broadcasting across the peers mesh, responses exposed as an
IAsyncEnumerable<Response>
(request & response streaming supported too, however new peers added to the mesh after the start of a broadcast request won't process it)
https://www.nuget.org/packages/Stormancer.Tcp
To declare operation handlers, create a class inheriting from Stormancer.Tcp.ITcpPeerConfigurator
and implement the Configure
method:
void ITcpPeerConfigurator.Configure(ITcpPeerApiBuilder builder)
{
ReadOnlyMemory<byte> seq = TestSequence();
builder.ConfigureHandler("test", async ctx =>
{
var readResult = await ctx.Reader.ReadAtLeastAsync(SEGMENTS * 64);
ReadOnlyMemory<byte> array = readResult.Buffer.Slice(0, SEGMENTS * 64).ToArray();
Debug.Assert(seq.Span.SequenceEqual(array.Span));
ctx.Reader.AdvanceTo(readResult.Buffer.End);
ctx.Reader.Complete();
for (int i = 0; i < SEGMENTS; i++)
{
await ctx.Writer.WriteAsync(array.Slice(64 * i, 64));
}
ctx.Writer.Complete();
});
}
var server = new TcpPeer("test-server", Log, () => new ITcpPeerConfigurator[] { configurator }, _ => { }, _ => Task.CompletedTask, _ => { }, (_, _) => { });
//For the sample, we start a server on a random port by providing 0 as the port.
_ = server.RunAsync(new System.Net.IPEndPoint(IPAddress.Any, 0), CancellationToken.None);
var client = new TcpPeer("test- client", Log, () => Enumerable.Empty<ITcpPeerConfigurator>(), _ => { }, _ => Task.CompletedTask, _ => { }, (_, _) => { });
var endpoint = new IPEndPoint(IPAddress.Loopback, server.LocalEndpoints.First().Port);
await client.Connect(endpoint);
TcpPeer instance can be both clients and servers at the same time. It's possible to create complex topologies of peers by starting them all as servers, then connecting to remote peers :
_ = peer.RunAsync(new System.Net.IPEndPoint(IPAddress.Any, 0), CancellationToken.None);
await peer.Connect(endpoint1);
await client.Connect(endpoint2);
ReadOnlyMemory<byte> seq = TestSequence();
using (var request = client.Send(endpoint, "test", default))
{
for (int i = 0; i < SEGMENTS; i++)
{
await request.Writer.WriteAsync(seq.Slice(64 * i, 64));
}
request.Writer.Complete();
var readResult = await request.Reader.ReadAtLeastAsync(SEGMENTS * 64);
ReadOnlyMemory<byte> array = readResult.Buffer.Slice(0, SEGMENTS * 64).ToArray();
Debug.Assert(seq.Span.SequenceEqual(array.Span));
}
using (var request = server.Broadcast("test", default))
{
for (int i = 0; i < SEGMENTS; i++)
{
await request.Writer.WriteAsync(seq.Slice(64 * i, 64));
}
request.Writer.Complete();
var nbResponses = 0;
await foreach (var response in request.GetResponses())
{
var readResult = await response.Reader.ReadAtLeastAsync(SEGMENTS * 64);
ReadOnlyMemory<byte> array = readResult.Buffer.Slice(0, SEGMENTS * 64).ToArray();
Debug.Assert(seq.Span.SequenceEqual(array.Span));
response.Reader.Complete();
nbResponses++;
}
Debug.Assert(nbResponses == NB_PEERS);
}
- Refactor into more classes/files, especially separate the TcpPeer client class from the internal transport class.
- Create a Configuration object to enable creating a peer without providing a bunch of lambdas.
- Add support for SSL!
- Refactor the pipe completion code. It's currently a mess. Probably necessary to do 1.
- Provides a way to manually start a request after creating the request object to eliminate scheduling when the request body can be fully written before sending.
- CI