diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 3abb2fd86ed..9f58233cba3 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -3995,7 +3995,7 @@ namespace Akka.IO } public class Bind : Akka.IO.Tcp.Command { - public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 100, System.Collections.Generic.IEnumerable options = null, bool pullMode = False) { } + public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 1024, System.Collections.Generic.IEnumerable options = null, bool pullMode = False) { } public int Backlog { get; } public Akka.Actor.IActorRef Handler { get; } public System.Net.EndPoint LocalAddress { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 284afaaade0..605bc1f5525 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -3985,7 +3985,7 @@ namespace Akka.IO } public class Bind : Akka.IO.Tcp.Command { - public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 100, System.Collections.Generic.IEnumerable options = null, bool pullMode = False) { } + public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 1024, System.Collections.Generic.IEnumerable options = null, bool pullMode = False) { } public int Backlog { get; } public Akka.Actor.IActorRef Handler { get; } public System.Net.EndPoint LocalAddress { get; } diff --git a/src/core/Akka.Tests/IO/TcpListenerSpec.cs b/src/core/Akka.Tests/IO/TcpListenerSpec.cs index a6b845ad47d..e737eeddb11 100644 --- a/src/core/Akka.Tests/IO/TcpListenerSpec.cs +++ b/src/core/Akka.Tests/IO/TcpListenerSpec.cs @@ -75,7 +75,6 @@ class TestSetup private readonly IActorRef _handlerRef; private readonly TestProbe _bindCommander; private readonly TestProbe _parent; - private readonly TestProbe _selectorRouter; private readonly TestActorRef _parentRef; public TestSetup(TestKitBase kit, bool pullMode) @@ -87,7 +86,7 @@ public TestSetup(TestKitBase kit, bool pullMode) _handlerRef = _handler.Ref; _bindCommander = kit.CreateTestProbe(); _parent = kit.CreateTestProbe(); - _selectorRouter = kit.CreateTestProbe(); + SelectorRouter = kit.CreateTestProbe(); _parentRef = new TestActorRef(kit.Sys, Props.Create(() => new ListenerParent(this, pullMode))); } @@ -115,10 +114,7 @@ public async Task AttemptConnectionToEndpoint() public IActorRef Listener { get { return _parentRef.UnderlyingActor.Listener; } } - public TestProbe SelectorRouter - { - get { return _selectorRouter; } - } + public TestProbe SelectorRouter { get; } public TestProbe BindCommander { get { return _bindCommander; } } public TestProbe Parent { get { return _parent; } } @@ -128,7 +124,7 @@ public TestProbe SelectorRouter internal void AfterBind(Socket socket) => LocalEndPoint = (IPEndPoint)socket.LocalEndPoint; - class ListenerParent : ActorBase + private class ListenerParent : ActorBase { private readonly TestSetup _test; private readonly bool _pullMode; diff --git a/src/core/Akka.Tests/IO/TcpSettingsSpec.cs b/src/core/Akka.Tests/IO/TcpSettingsSpec.cs new file mode 100644 index 00000000000..b9b55059787 --- /dev/null +++ b/src/core/Akka.Tests/IO/TcpSettingsSpec.cs @@ -0,0 +1,42 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.IO; +using FluentAssertions; +using Xunit; + +namespace Akka.Tests.IO +{ + public class TcpSettingsSpec + { + [Fact] + public void TcpSettings_should_parse_all_akka_io_tcp_config_values_correctly() + { + // Arrange: load the default reference config + var config = ConfigurationFactory.Default(); + var tcpConfig = config.GetConfig("akka.io.tcp"); + var settings = TcpSettings.Create(tcpConfig); + + // Assert: all values match akka.conf reference + settings.BufferPoolConfigPath.Should().Be("akka.io.tcp.disabled-buffer-pool"); + settings.InitialSocketAsyncEventArgs.Should().Be(32); + settings.TraceLogging.Should().BeFalse(); + settings.BatchAcceptLimit.Should().Be(Environment.ProcessorCount * 2); + settings.RegisterTimeout.Should().Be(TimeSpan.FromSeconds(5)); + settings.ReceivedMessageSizeLimit.Should().Be(int.MaxValue); + settings.ManagementDispatcher.Should().Be("akka.actor.internal-dispatcher"); + settings.FileIODispatcher.Should().Be("akka.actor.default-blocking-io-dispatcher"); + settings.TransferToLimit.Should().Be(524288); // 512 KiB + settings.FinishConnectRetries.Should().Be(5); + settings.OutgoingSocketForceIpv4.Should().BeFalse(); + settings.WriteCommandsQueueMaxSize.Should().Be(-1); + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Configuration/akka.conf b/src/core/Akka/Configuration/akka.conf index 1a11eb86a71..602d7c8d4ef 100644 --- a/src/core/Akka/Configuration/akka.conf +++ b/src/core/Akka/Configuration/akka.conf @@ -763,8 +763,11 @@ akka { # The maximum number of connection that are accepted in one go, # higher numbers decrease latency, lower numbers increase fairness on - # the worker-dispatcher - batch-accept-limit = 10 + # the worker-dispatcher. + # + # By default we scale to logical CPUs * 2, but you can set this to an + # integer value greater than 0. + batch-accept-limit = scale-to-cpus # The duration a connection actor waits for a `Register` message from # its commander before aborting the connection. diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index 94cbc3c5831..0f57bbc6cdc 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -156,12 +156,12 @@ public class Bind : Command /// /// The actor who will be handling the TCP listener. /// The local endpoint we are binding to. - /// TCP backlog - the number of pending connections that the queue will hold. + /// TCP backlog - the number of pending connections that the queue will hold. Defaults to 1024. /// A set of socket options. /// Specifies whether we're running in "pull mode" or not. public Bind(IActorRef handler, EndPoint localAddress, - int backlog = 100, + int backlog = 1024, IEnumerable options = null, bool pullMode = false) { diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index b3db55feea7..a7312f2d88e 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -6,105 +6,99 @@ //----------------------------------------------------------------------- using System; -using System.Net; using System.Net.Sockets; using Akka.Actor; using Akka.Dispatch; using Akka.Event; using Akka.Util.Internal; -using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; namespace Akka.IO { - class TcpListener : ActorBase, IRequiresMessageQueue + /// + /// SocketAsyncEventArgs is a wrapper around SocketAsyncEventArgs that allows us to deliver + /// notifications to actors upon completion of the operation. + /// + internal sealed class SocketAsyncActorEventArgs : SocketAsyncEventArgs + { + public SocketAsyncActorEventArgs(IActorRef notifyMe, EventHandler onCompleted) + { + NotifyMe = notifyMe; + Completed += onCompleted; + } + + /// + /// The actor we're going to notify once the operation is completed. + /// + public IActorRef NotifyMe { get; } + } + + /// + /// INTERNAL API + /// + /// TcpListener is an internal actor that binds to a local address and listens for incoming TCP connections. + /// + internal sealed class TcpListener : ActorBase, IRequiresMessageQueue { private readonly TcpExt _tcp; - private readonly IActorRef _bindCommander; + private readonly IActorRef _bindCommander; // forwarded destination for Connected private Tcp.Bind _bind; private Socket _socket; private readonly ILoggingAdapter _log = Context.GetLogger(); - private int _acceptLimit; - private SocketAsyncEventArgs[] _saeas; + private readonly int _acceptLimit; + private SocketAsyncActorEventArgs[]? _acceptPool; private bool _binding; + private static readonly EventHandler OnCompleted = OnIoCompleted; + + private sealed record AcceptCompleted(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; + + private sealed record RetryAccept(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD public TcpListener(TcpExt tcp, IActorRef bindCommander, Tcp.Bind bind) { _tcp = tcp; - _bindCommander = bindCommander; + _acceptLimit = tcp.Settings.BatchAcceptLimit; + + if (_acceptLimit <= 0) + { + _log.Warning("Batch accept limit is set to {0}, which is less than or equal to 0. " + + "This value will HANG the listener.", _acceptLimit);; + + _acceptLimit = TcpSettings.DefaultAcceptLimit; + _log.Warning("Using default value of {0} for batch accept limit", _acceptLimit); + } - Become(Initializing()); + _bindCommander = bindCommander; + Self.Tell(bind); } - private Receive Initializing() => message => + private Receive Bound() => message => { switch (message) { - case Tcp.Bind bind: - if (_binding) - { - _log.Warning("Already trying to bind to TCP channel on endpoint [{0}]", _bind.LocalAddress); - return true; - } - _binding = true; - _bind = bind; - _acceptLimit = bind.PullMode ? 0 : _tcp.Settings.BatchAcceptLimit; - BindAsync().PipeTo(Self); - return true; - - case Status.Failure fail: - _bindCommander.Tell(_bind.FailureMessage.WithCause(fail.Cause)); - _log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress); - Context.Stop(Self); - _binding = false; + case AcceptCompleted accepted: + HandleAccept(accepted.EventArgs); return true; - - case Tcp.Bound bound: - Context.Watch(_bind.Handler); - _bindCommander.Tell(bound); - Become(Bound()); - _binding = false; - return true; - - default: - return false; - } - }; - private Receive Bound() => message => - { - switch (message) - { - case SocketEvent evt: - var saea = evt.Args; - if (saea.SocketError == SocketError.Success) - Context.ActorOf(Props.Create(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local)); - - saea.AcceptSocket = null; - if (!_socket.AcceptAsync(saea)) - Self.Tell(new SocketEvent(saea)); + case RetryAccept retry: + StartAccept(retry.EventArgs); return true; - case Tcp.ResumeAccepting resumeAccepting: - _acceptLimit = resumeAccepting.BatchSize; - // TODO: this is dangerous, previous async args are not disposed and there's no guarantee that they're not still receiving data - _saeas = Accept(_acceptLimit).ToArray(); + case Tcp.ResumeAccepting: + // NO-OP - this is obsolete return true; - case Tcp.Unbind _: + case Tcp.Unbind: Become(Unbinding(Sender)); UnbindAsync().PipeTo(Self); return true; + case Status.Failure failure: + _log.Error(failure.Cause, "Received SocketAsyncEventArgs failure"); + return true; + default: return false; } @@ -119,27 +113,74 @@ private Receive Unbinding(IActorRef requester) => message => _log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress); Context.Stop(Self); return true; - + case Status.Failure fail: _log.Error(fail.Cause, "Failed to unbind TCP listener for address [{0}]", _bind.LocalAddress); Context.Stop(Self); return true; - + default: return false; } }; - - private IEnumerable Accept(int limit) + + private void StartAccept(SocketAsyncEventArgs saea) { - for(var i = 0; i < limit; i++) + var pending = _socket.AcceptAsync(saea); + if (!pending) + Self.Tell(new AcceptCompleted(saea), Self); // synchronous completion ➔ mailbox + } + + private static void OnIoCompleted(object? sender, SocketAsyncEventArgs saea) + { + // Marshall back into the actor context – keeps user code off the IOCP thread. + var actorArgs = (SocketAsyncActorEventArgs)saea; + + var actor = actorArgs.NotifyMe; + if (actorArgs.LastOperation == SocketAsyncOperation.Accept) { - var self = Self; - var saea = new SocketAsyncEventArgs(); - saea.Completed += (_, e) => self.Tell(new SocketEvent(e)); - if (!_socket.AcceptAsync(saea)) - Self.Tell(new SocketEvent(saea)); - yield return saea; + actor.Tell(new AcceptCompleted(saea), actor); + } + else // should never happen + { + // This should never happen, but just in case. + var ioe = new InvalidOperationException( + $"SocketAsyncEventArgs last operation is not Accept: {actorArgs.LastOperation}"); + actor.Tell(new Status.Failure(ioe), actor); + + // retry the operation + actorArgs.AcceptSocket = null; + actor.Tell(new RetryAccept(actorArgs)); + } + } + + private void HandleAccept(SocketAsyncEventArgs saea) + { + switch (saea.SocketError) + { + case SocketError.Success: + var accepted = saea.AcceptSocket!; + saea.AcceptSocket = null; // ready for re‑use + Context.ActorOf(Props + .Create(_tcp, accepted, _bind.Handler, _bind.Options, _bind.PullMode) + .WithDeploy(Deploy.Local)); + StartAccept(saea); // keep the pool full + break; + + case SocketError.ConnectionReset: + case SocketError.NoBufferSpaceAvailable: + case SocketError.TryAgain: + case SocketError.TimedOut: + case SocketError.WouldBlock: + // transient – short back‑off then retry + saea.AcceptSocket = null; + Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(10), Self, + new RetryAccept(saea), ActorRefs.NoSender); + break; + default: + _log.Error("Fatal socket error in TcpListener: {0}", saea.SocketError); + Context.Stop(Self); + break; } } @@ -155,7 +196,17 @@ private IEnumerable Accept(int limit) _bind.Options.ForEach(x => x.BeforeServerSocketBind(_socket)); _socket.Bind(_bind.LocalAddress); _socket.Listen(_bind.Backlog); - _saeas = Accept(_acceptLimit).ToArray(); + + _acceptPool = new SocketAsyncActorEventArgs[_acceptLimit]; + for (var i = 0; i < _acceptPool.Length; i++) + { + var saea = new SocketAsyncActorEventArgs(Self, OnCompleted); + _acceptPool[i] = saea; + } + + // start accepting connections + foreach (var saea in _acceptPool) + StartAccept(saea); return Task.FromResult(new Tcp.Bound(_socket.LocalEndPoint)); } @@ -178,7 +229,7 @@ private IEnumerable Accept(int limit) return Task.FromException(ex); } } - + protected override SupervisorStrategy SupervisorStrategy() { return Tcp.ConnectionSupervisorStrategy; @@ -186,33 +237,60 @@ protected override SupervisorStrategy SupervisorStrategy() protected override bool Receive(object message) { - throw new NotImplementedException(); + switch (message) + { + case Tcp.Bind bind: + if (_binding) + { + _log.Warning("Already trying to bind to TCP channel on endpoint [{0}]", _bind.LocalAddress); + return true; + } + + _binding = true; + _bind = bind; + + _log.Info("Binding TCP channel on endpoint [{0}]", _bind.LocalAddress); + + BindAsync().PipeTo(Self); + return true; + + case Status.Failure fail: + _bindCommander.Tell(_bind.FailureMessage.WithCause(fail.Cause)); + _log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress); + Context.Stop(Self); + _binding = false; + return true; + + case Tcp.Bound bound: + Context.Watch(_bind.Handler); + _bindCommander.Tell(bound); + Become(Bound()); + _binding = false; + return true; + + default: + return false; + } } - /// - /// TBD - /// protected override void PostStop() { try { + if (_acceptPool != null) + foreach (var saea in _acceptPool) + { + // remove event handler + saea.Completed -= OnCompleted; + saea.Dispose(); + } + _socket?.Dispose(); - _saeas?.ForEach(x => x.Dispose()); } catch (Exception e) { _log.Debug("Error closing ServerSocketChannel: {0}", e); } } - - private readonly struct SocketEvent : INoSerializationVerificationNeeded - { - public readonly SocketAsyncEventArgs Args; - - public SocketEvent(SocketAsyncEventArgs args) - { - Args = args; - } - } } -} +} \ No newline at end of file diff --git a/src/core/Akka/IO/TcpManager.cs b/src/core/Akka/IO/TcpManager.cs index efbb6ad3604..2c1189008ff 100644 --- a/src/core/Akka/IO/TcpManager.cs +++ b/src/core/Akka/IO/TcpManager.cs @@ -57,7 +57,6 @@ internal sealed class TcpManager : ActorBase public TcpManager(TcpExt tcp) { _tcp = tcp; - Context.System.EventStream.Subscribe(Self, typeof(DeadLetter)); } protected override bool Receive(object message) @@ -76,14 +75,6 @@ protected override bool Receive(object message) Context.ActorOf(Props.Create(_tcp, commander, b).WithDeploy(Deploy.Local)); return true; } - case DeadLetter dl: - { - if (dl.Message is SocketCompleted completed) - { - //TODO: release resources? - } - return true; - } default: throw new ArgumentException($"The supplied message of type {message.GetType().Name} is invalid. Only Connect and Bind messages are supported. " + $"If you are going to manage your connection state, you need to communicate with Tcp.Connected sender actor. " + diff --git a/src/core/Akka/IO/TcpSettings.cs b/src/core/Akka/IO/TcpSettings.cs index 0847e49f1c5..33c19e0316c 100644 --- a/src/core/Akka/IO/TcpSettings.cs +++ b/src/core/Akka/IO/TcpSettings.cs @@ -25,7 +25,11 @@ public static TcpSettings Create(ActorSystem system) { var config = system.Settings.Config.GetConfig("akka.io.tcp"); if (config.IsNullOrEmpty()) - throw ConfigurationException.NullOrEmptyConfig("akka.io.tcp");//($"Failed to create {typeof(TcpSettings)}: akka.io.tcp configuration node not found"); + throw + ConfigurationException + .NullOrEmptyConfig< + TcpSettings>( + "akka.io.tcp"); //($"Failed to create {typeof(TcpSettings)}: akka.io.tcp configuration node not found"); return Create(config); } @@ -44,7 +48,9 @@ public static TcpSettings Create(Config config) bufferPoolConfigPath: config.GetString("buffer-pool", "akka.io.tcp.disabled-buffer-pool"), initialSocketAsyncEventArgs: config.GetInt("nr-of-socket-async-event-args", 32), traceLogging: config.GetBoolean("trace-logging", false), - batchAcceptLimit: config.GetInt("batch-accept-limit", 10), + batchAcceptLimit: config.GetString("batch-accept-limit") == "scale-to-cpus" + ? DefaultAcceptLimit + : config.GetInt("batch-accept-limit", DefaultAcceptLimit), registerTimeout: config.GetTimeSpan("register-timeout", TimeSpan.FromSeconds(5)), receivedMessageSizeLimit: config.GetString("max-received-message-size", "unlimited") == "unlimited" ? int.MaxValue @@ -59,18 +65,23 @@ public static TcpSettings Create(Config config) writeCommandsQueueMaxSize: config.GetInt("write-commands-queue-max-size", -1)); } - public TcpSettings( string bufferPoolConfigPath, - int initialSocketAsyncEventArgs, - bool traceLogging, - int batchAcceptLimit, - TimeSpan? registerTimeout, - int receivedMessageSizeLimit, - string managementDispatcher, - string fileIoDispatcher, - int transferToLimit, - int finishConnectRetries, - bool outgoingSocketForceIpv4, - int writeCommandsQueueMaxSize) + /// + /// Default size of the SAEA pool + /// + internal static readonly int DefaultAcceptLimit = Environment.ProcessorCount * 2; + + public TcpSettings(string bufferPoolConfigPath, + int initialSocketAsyncEventArgs, + bool traceLogging, + int batchAcceptLimit, + TimeSpan? registerTimeout, + int receivedMessageSizeLimit, + string managementDispatcher, + string fileIoDispatcher, + int transferToLimit, + int finishConnectRetries, + bool outgoingSocketForceIpv4, + int writeCommandsQueueMaxSize) { BufferPoolConfigPath = bufferPoolConfigPath; InitialSocketAsyncEventArgs = initialSocketAsyncEventArgs; @@ -88,7 +99,7 @@ public TcpSettings( string bufferPoolConfigPath, /// /// A config path to the section defining which byte buffer pool to use. - /// Buffer pools are used to mitigate GC-pressure made by potentiall allocation + /// Buffer pools are used to mitigate GC-pressure made by potential allocation /// and deallocation of byte buffers used for writing/receiving data from sockets. /// public string BufferPoolConfigPath { get; } @@ -112,7 +123,7 @@ public TcpSettings( string bufferPoolConfigPath, /// worker-dispatcher /// public int BatchAcceptLimit { get; } - + /// /// The duration a connection actor waits for a `Register` message from /// its commander before aborting the connection. @@ -165,7 +176,7 @@ public TcpSettings( string bufferPoolConfigPath, /// in cases when DnsEndPoint is used to describe the remote address /// public bool OutgoingSocketForceIpv4 { get; } - + /// /// Limits maximum size of internal queue, used in connection actor /// to store pending write commands. @@ -173,4 +184,4 @@ public TcpSettings( string bufferPoolConfigPath, /// public int WriteCommandsQueueMaxSize { get; } } -} +} \ No newline at end of file