-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add System.IO.Pipelines to be able to better support Async #949
Conversation
FYI @bollhals |
I haven't looked at the code, but just an advisory: don't expose any of the Pipelines.Sockets.Unofficial types in the public API. The reason for this is that "bedrock" (i.e. the official transports) should add client connections in full at some point, at which point you'll probably want to disentangle my lib and use that instead. |
Thanks for the pointer Marc. None of them are exposed in the public API, and we should make sure to keep it that way :) |
@mgravell thank you for chiming in :) |
I just quickly scanned through the PR, but I quickly realized I need to read up on the pipelines more to get a better understanding of the impact of this change. |
I'm a bit swamped right now so I haven't looked in details but have you considered leaving the code as is with the channels in between (which would also allow more sophisticated batching to be implemented later right?) and just float task completion sources so that things become await able on the upper layers where needed? This is would allows to build upon the existing optimizations without having to take a dependency just jet to pipelines. Again maybe my idea is mute anyway because I missed some details by just glancing over but anyway I wanted to drop the idea as early as possible. This input was actually the thing I was hoping we could align on in the call I suggested so that all the reviewers share a common understanding of where things are heading. But maybe it is enough to outline the rough plan and alternatives on an issue or a markdown in the repo |
The pipelines maintain internal buffers so they take care of the batching, and they are very low-allocation because they reuse ValueTask objects and memory pools so all that maintenance goes away from the codebase, and instead it becomes a simpler network de(serializer) with the IO taken care of by the pipelines. They also implement async Socket/Stream reading/writing with minimal allocations which isn't trivial to do in .NET Framework, due to the async APIs on streams implementing Task instead of ValueTask, which was improved in .NET Core. With proper async interfaces on the IO level, implementing low-alloc Async APIs for the client becomes a lot easier, as well as freeing us from manual reusable buffer maintenance for IO which had to be done manually with ArrayPools previously. That was the biggest driver for this PR since it was trivial to do :) But I'm loving this discussion! |
Yeah and with the TCS approach ValueTask is definitely a bit out of the window. |
To connect to the previous discussion as far as I'm aware pipelines are handy for single reader/writer scenarios which essentially means we have to synchronize access to it as you did. By doing so we make it possible to concurrently use the model but we pay the overhead again of the underlying synchronization. Are we making the right tradeoffs from the perspective of the impact this will have on the higher level abstractions (the internal benefits have already been outlined)? |
We were essentially synchronizing with the channels, it was just abstracted away between the TryWrite/ReadAsync calls on the channels, where the writes were buffered (possibly infinitely) because we used an unconstrained Channel. Pipelines does the same really, FlushAsync returns immediately (making the lock very short) if it still has buffer room to fill and only schedules an awaiter if the underlying buffer needs to be flushed. That way it can handle back-pressure better and more precisely, which is something the current Channels implementation doesn't do since we used an unconstrained channel, but could have if we had use a constrained one, but then we are synchronizing again. One bug that is possible with our current channel implementation is that very aggressive message publishing can potentially run the client out of memory if the network throughput is unable to keep up and catch up with the amount of data being sent for an extended period. That bug is fixed with pipelines, but again, it requires locking, which we'd need anyway if we wanted to fix it in our current channels implementation as well by using a constrained channel. The benchmarks I've done have not surfaced any throughput degradation in the optimal scenario (RabbitMQ server running on my local machine) and profiling shows little time spent waiting for locks. Would be great to have more benchmarks if anyone has a remote RabbitMQ server he can hammer on for more tests :) |
With the latest changes, it is now pretty trivial to create a fully Async version of BasicPublish, and pretty much all commands that do not require a response (which utilize the ModelSend method), so that's one stop closer to an async client :) I can provide an example later. Commands that require a response need some more infrastructure to work though, but are perfectly doable, however it's better to just do one step at a time :) |
I've read up a bit on pipelines and read your explanations, very interesting I must say =) What I do not quite get is the benefits they bring that we couldn't do with channels+streams or could do simpler now with pipelines. |
It's the ValueTask async implementation I'm mostly after. That is not trivial to get right on .NET Framework, and also the IO buffer management which is taken care of by Pipelines. It basically unlocks a lot of async implementation paths that would otherwise require a lot of clever ValueTask implementations to get right without incurring A LOT of allocation overhead, especially on .NET Framework where there are no ValueTask implementations for Streams for example. The reduced ArrayPool.Rent and Return is just an added benefit. Also, the automatic back-pressure handling which avoids possible OOM exceptions in the case of slow connections + rapid generation of big events due to the current unconstrained channels. I'd highly suggest reading up on Marc Gravells excellent blog posts on the subject (https://blog.marcgravell.com/2018/07/pipe-dreams-part-1.html?m=1) as well, they are very insightful and show the benefits of the ValueTask implementations clearly, as well as the effort involved in getting reusable ValueTasks right. |
are you planning to add more benchmarks? right now I only found only benchmarks for the serializer but nothing regarding to networking? |
Networking benchmarks are almost impossible to do reliably due their nature, but I'll add a couple of profiling results showing before and after allocations and CPU profiles. |
} | ||
int handled = 0; | ||
ReadOnlySequence<byte> buffer = readResult.Buffer; | ||
if (!buffer.IsEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is redundand, ProcessBuffer will return 0 if the buffer is empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might still return 0 even if the buffer is not empty, since we might not have an entire frame to read.
Here are benchmarks from the latest runs after improvements and code fixes. Benchmark: Send and receive 500.000 messages with a 512 byte payload with a Guid set as correlation id. .NET Framework 4.8.NET Core 3.1 |
Benchmark code: class Program
{
private static int messagesSent = 0;
private static int messagesReceived = 0;
private static int batchesToSend = 100;
private static int itemsPerBatch = 5000;
private static ManualResetEventSlim manualResetEventSlim = new ManualResetEventSlim();
static async Task Main(string[] args)
{
var connectionString = new Uri("amqp://guest:guest@localhost/");
var connectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, Uri = connectionString };
var connection = connectionFactory.CreateConnection();
var connection2 = connectionFactory.CreateConnection();
var publisher = connection.CreateModel();
var subscriber = connection2.CreateModel();
publisher.ExchangeDeclare("test", ExchangeType.Topic, true);
subscriber.QueueDeclare("testqueue", true, false, true);
var asyncListener = new AsyncEventingBasicConsumer(subscriber);
asyncListener.Received += AsyncListener_Received;
subscriber.QueueBind("testqueue", "test", "");
subscriber.BasicConsume("testqueue", true, "testconsumer", asyncListener);
byte[] payload = new byte[512];
var batchPublish = Task.Run(() =>
{
while (messagesSent < batchesToSend * itemsPerBatch)
{
var header = publisher.CreateBasicProperties();
header.CorrelationId = $"{Guid.NewGuid()}";
publisher.BasicPublish("test", "", false, header, payload);
messagesSent++;
}
});
await batchPublish.ConfigureAwait(false);
manualResetEventSlim.Wait();
Console.WriteLine("Done receiving all messages.");
subscriber.Dispose();
publisher.Dispose();
connection.Dispose();
connection2.Dispose();
}
private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs @event)
{
Interlocked.Increment(ref messagesReceived);
if (messagesReceived == batchesToSend * itemsPerBatch)
{
manualResetEventSlim.Set();
}
return Task.CompletedTask;
}
} |
b3d65de
to
48ec4cc
Compare
Rebased and squished the commits to clean stuff up |
new UnboundedChannelOptions | ||
{ | ||
AllowSynchronousContinuations = false, | ||
AllowSynchronousContinuations = true, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the resulting change for this?
Does publish now possibly write to the pipe/socket? If so is this what we want?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Publish will write to the channel, but it might run synchronously if the WriteLoop completes synchronously, for example Pipeline had enough empty buffer to receive the data. This is what we want to avoid the async overhead for synchronous completions since the Pipelines take care of async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this also means that the thread that called publish might be blocked for a longer time, isn't-it?
Question: How important is .NET Framework for 7.0?
From what I can read out of the images, it seems that:
So while I really like the pipeline (thanks for forcing me to finally read up on them 👍 ) I have trouble seeing the benefits while also the code is getting harder to understand (at least for now, since not a lot are experts on pipes). Also on .NET Framework I struggle to see big benefits. PS: I have a local change, that modifies all "Method"-Classes to be structs, and hence dropping the allocations. But in order to do that, It needs to serialize the data before writing it to the channel. (Structs can't be inherited and the different structs can't be put in the OutputFrame). I don't see a way of dropping these allocations with this change (excluding maybe pooling). PPS: @stebet To be clear here, I don't want to devalue your work here, I very much appreciate it! I really like this PR, I learned a lot. |
I have yet to do more benchmarks, as the ones I've done are not very parallel and therefore induce a lot of overhead in async calls, but again, I mainly see this PR as a stepping stone to be able to provide a real low-allocation async API for the client. ValueTask is already available in .NET Framework. Also, having pipelines in place makes the client a lot more ready for "Bedrock" which are the new Client/Server abstractions being introduced to .NET. Yes the code is more convoluted, but that's mostly isolated to the IO part, which frankly can be very hard to do efficiently and asynchronoysly in a clean way (see Task based Streams in .NET Framework for reference, which have HUGE allocation penalties, especially for small reads). I also learned a lot, and there is no pressure on accepting this PR, but I can show some further examples of how this can provide us with a true async API, hopefully side-by-side with the existing API, which I'm planning on experimenting with next :) |
nice one, I was also thinking about that one to get heap allocations down |
token = _singleWriterMutex.TryWait(WaitOptions.NoDelay); | ||
if (!token.Success) | ||
{ | ||
// Didn't get the lock immediately, let's backlog the write and start the backlog writer task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General question, this makes it possible that the order of published messages could get changed. Is this "allowed"? Does any documentation say something about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can they? Considering that we are using a write lock, which only one can hold at a time, if we can't get the lock synchronously, we'll add the write to the backlog, since the backlog is the one holding the lock. I'll take a closer look at this as well, I'm shamelessly stealing this logic from StackExchange.Redis :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, my thought process was:
- T1 has the lock, WriteFrameToPipe
- T2 didn't get the lock, wrote frame to channel, about to start the Task.Run
- T3 just entered the method
=> If T1 releases the lock & T3 gets the lock before the channel gets it, it will publish T3 before T2's message.
But now that I think of it, this is most probably irrelevant, as it uses multiple threads which the order will never be guaranteed anyway...
{ | ||
if (_writerTask == null || _writerTask.Status == TaskStatus.RanToCompletion) | ||
{ | ||
_writerTask = Task.Run(WriteLoop, CancellationToken.None); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This (although unlikely) could lead to a race condition where two Tasks are created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I have yet to clean that up a bit, thanks for pointing it out :)
It might also be time to have it updated a bit. I can maybe take a look at it this week. |
I'm working on refreshing this PR and rebuilding it on-top of the latest master. I'll probably force-update it once done :) |
@stebet thank you. |
So, I'm working in updating this PR, changed a bit and rebased on the latest |
Due to the length of this PR convo, I'm going to close this PR and create a fresh one considering the feedback from this one to explain things in more details and to clear up all the clutter. |
Proposed Changes
This PR adds System.IO.Pipelines and replaces the Stream+Channels implementation in the SocketFrameHandler. It does so via. Pipelines.Socket.Unofficial which is written and maintained by @mgravell from Stack Overflow and used for example in their StackExchange.Redis library.
No public facing API changes.
This is a good preparation to be able to add Async support since we can now do the network IO completely asynchronously.
Notable changes:
Types of Changes
Checklist
CONTRIBUTING.md
document