Akka.IO: fix TcpListener connection queue problem#7623
Conversation
|
Depends on #7621 |
|
Still need to update this to include some changes to the default number of accepted connections. |
|
One change I think I'm going to make is removing the "accept-batch-size" setting altogether. It's redundant - we already have TCP As for the size of the |
Aaronontheweb
left a comment
There was a problem hiding this comment.
Detailed my changes
| public class Bind : Akka.IO.Tcp.Command | ||
| { | ||
| public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 100, System.Collections.Generic.IEnumerable<Akka.IO.Inet.SocketOption> options = null, bool pullMode = False) { } | ||
| public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 1024, System.Collections.Generic.IEnumerable<Akka.IO.Inet.SocketOption> options = null, bool pullMode = False) { } |
There was a problem hiding this comment.
Significantly increased the default backlog to a much more reasonable 1024, which is the default in Akka.Remote.
| /// SocketAsyncEventArgs is a wrapper around SocketAsyncEventArgs that allows us to deliver | ||
| /// notifications to actors upon completion of the operation. | ||
| /// </summary> | ||
| internal sealed class SocketAsyncActorEventArgs : SocketAsyncEventArgs |
There was a problem hiding this comment.
Common pattern in Kestrel, DotNetty, and I think we did this in TurboMqtt: wrap the SocketAsyncEventArgs with additional operation-handling context built into it.
| private int _acceptLimit; | ||
| private SocketAsyncEventArgs[] _saeas; | ||
| private readonly int _acceptLimit = DefaultAcceptLimit; | ||
| private SocketAsyncActorEventArgs[]? _acceptPool; |
There was a problem hiding this comment.
Use our custom SAEA implementation
| private readonly int _acceptLimit = DefaultAcceptLimit; | ||
| private SocketAsyncActorEventArgs[]? _acceptPool; | ||
| private bool _binding; | ||
| private static readonly EventHandler<SocketAsyncEventArgs> OnCompleted = OnIoCompleted; |
There was a problem hiding this comment.
create a static EventHandler instance
| private bool _binding; | ||
| private static readonly EventHandler<SocketAsyncEventArgs> OnCompleted = OnIoCompleted; | ||
|
|
||
| private sealed record AcceptCompleted(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; |
There was a problem hiding this comment.
Internal message types for signaling completion / retry needed
| case SocketError.TimedOut: | ||
| case SocketError.WouldBlock: | ||
| // transient – short back‑off then retry | ||
| saea.AcceptSocket = null; |
There was a problem hiding this comment.
All of these error codes a retriable - schedule a short back-off and go again.
Important note: can't use IWithTimers for this because you'd need a unique timer key each time - hence why I'm using the IScheduler. That's something we may want to improve or add to the IWithTimers interface.
| _socket.Bind(_bind.LocalAddress); | ||
| _socket.Listen(_bind.Backlog); | ||
| _saeas = Accept(_acceptLimit).ToArray(); | ||
|
|
There was a problem hiding this comment.
Allocate the pool and start accepting connections on it - in theory we could do this in a single loop but this code only ever runs once.
| protected override bool Receive(object message) | ||
| { | ||
| throw new NotImplementedException(); | ||
| switch (message) |
There was a problem hiding this comment.
This was, previously, the Initializing behavior.
| { | ||
| // remove event handler | ||
| saea.Completed -= OnCompleted; | ||
| saea.Dispose(); |
There was a problem hiding this comment.
Properly dispose all SAEA
| initialSocketAsyncEventArgs: config.GetInt("nr-of-socket-async-event-args", 32), | ||
| traceLogging: config.GetBoolean("trace-logging", false), | ||
| batchAcceptLimit: config.GetInt("batch-accept-limit", 10), | ||
| batchAcceptLimit: config.GetString("batch-accept-limit") == "scale-to-cpus" |
There was a problem hiding this comment.
Handle the new scale-to-cpus value we accept for akka.io.tcp.batch-accept-limit
Changes
close #5988
Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):
Latest
devBenchmarksThis PR's Benchmarks