From 9c843acf7f49c56f5844c2f42c16725b4b404404 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 May 2025 14:48:14 -0500 Subject: [PATCH 01/12] Akka.IO.Tcp: cleaning up `TBD`s and API junk --- ...oreAPISpec.ApproveCore.DotNet.verified.txt | 11 +- .../CoreAPISpec.ApproveCore.Net.verified.txt | 11 +- src/core/Akka/IO/Tcp.cs | 275 +++++------------- src/core/Akka/IO/TcpManager.cs | 58 ++-- 4 files changed, 111 insertions(+), 244 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index afc6e038e32..33c552cb821 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -3977,10 +3977,9 @@ namespace Akka.IO protected override void PostStop() { } protected override bool Receive(object message) { } } - public class Tcp : Akka.Actor.ExtensionIdProvider + public sealed class Tcp : Akka.Actor.ExtensionIdProvider { public static readonly Akka.Actor.SupervisorStrategy ConnectionSupervisorStrategy; - public static readonly Akka.IO.Tcp Instance; public Tcp() { } public override Akka.IO.TcpExt CreateExtension(Akka.Actor.ExtendedActorSystem system) { } public static Akka.Actor.IActorRef Manager(Akka.Actor.ActorSystem system) { } @@ -4031,6 +4030,7 @@ namespace Akka.IO } public sealed class CommandFailed : Akka.IO.Tcp.Event { + public CommandFailed(Akka.IO.Tcp.Command cmd, Akka.Util.Option ex) { } public CommandFailed(Akka.IO.Tcp.Command cmd) { } public Akka.Util.Option Cause { get; } [Akka.Annotations.InternalApiAttribute()] @@ -4078,6 +4078,7 @@ namespace Akka.IO public class ConnectionClosed : Akka.IO.Tcp.Event, Akka.Event.IDeadLetterSuppression { public ConnectionClosed() { } + [System.Runtime.CompilerServices.NullableAttribute(2)] public virtual string Cause { get; } public virtual bool IsAborted { get; } public virtual bool IsConfirmed { get; } @@ -4087,6 +4088,7 @@ namespace Akka.IO public sealed class ErrorClosed : Akka.IO.Tcp.ConnectionClosed { public ErrorClosed(string cause) { } + [System.Runtime.CompilerServices.NullableAttribute(2)] public override string Cause { get; } public override bool IsErrorClosed { get; } public override string ToString() { } @@ -4191,15 +4193,14 @@ namespace Akka.IO { public static Akka.Actor.IActorRef Tcp(this Akka.Actor.ActorSystem system) { } } - public class TcpMessage + public class static TcpMessage { - public TcpMessage() { } public static Akka.IO.Tcp.Command Abort() { } public static Akka.IO.Tcp.Command Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint endpoint, int backlog, System.Collections.Generic.IEnumerable options, bool pullMode) { } public static Akka.IO.Tcp.Command Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint endpoint, int backlog) { } public static Akka.IO.Tcp.Command Close() { } public static Akka.IO.Tcp.Command ConfirmedClose() { } - public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress, System.Net.EndPoint localAddress, System.Collections.Generic.IEnumerable options, System.Nullable timeout, bool pullMode) { } + public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress, [System.Runtime.CompilerServices.NullableAttribute(2)] System.Net.EndPoint localAddress, System.Collections.Generic.IEnumerable options, System.Nullable timeout, bool pullMode) { } public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress) { } public static Akka.IO.Tcp.NoAck NoAck(object token = null) { } public static Akka.IO.Tcp.Command Register(Akka.Actor.IActorRef handler, bool keepOpenOnPeerClosed = False, bool useResumeWriting = True) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 895d2bd5d76..82946f8bc58 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -3967,10 +3967,9 @@ namespace Akka.IO protected override void PostStop() { } protected override bool Receive(object message) { } } - public class Tcp : Akka.Actor.ExtensionIdProvider + public sealed class Tcp : Akka.Actor.ExtensionIdProvider { public static readonly Akka.Actor.SupervisorStrategy ConnectionSupervisorStrategy; - public static readonly Akka.IO.Tcp Instance; public Tcp() { } public override Akka.IO.TcpExt CreateExtension(Akka.Actor.ExtendedActorSystem system) { } public static Akka.Actor.IActorRef Manager(Akka.Actor.ActorSystem system) { } @@ -4021,6 +4020,7 @@ namespace Akka.IO } public sealed class CommandFailed : Akka.IO.Tcp.Event { + public CommandFailed(Akka.IO.Tcp.Command cmd, Akka.Util.Option ex) { } public CommandFailed(Akka.IO.Tcp.Command cmd) { } public Akka.Util.Option Cause { get; } [Akka.Annotations.InternalApiAttribute()] @@ -4068,6 +4068,7 @@ namespace Akka.IO public class ConnectionClosed : Akka.IO.Tcp.Event, Akka.Event.IDeadLetterSuppression { public ConnectionClosed() { } + [System.Runtime.CompilerServices.NullableAttribute(2)] public virtual string Cause { get; } public virtual bool IsAborted { get; } public virtual bool IsConfirmed { get; } @@ -4077,6 +4078,7 @@ namespace Akka.IO public sealed class ErrorClosed : Akka.IO.Tcp.ConnectionClosed { public ErrorClosed(string cause) { } + [System.Runtime.CompilerServices.NullableAttribute(2)] public override string Cause { get; } public override bool IsErrorClosed { get; } public override string ToString() { } @@ -4181,15 +4183,14 @@ namespace Akka.IO { public static Akka.Actor.IActorRef Tcp(this Akka.Actor.ActorSystem system) { } } - public class TcpMessage + public class static TcpMessage { - public TcpMessage() { } public static Akka.IO.Tcp.Command Abort() { } public static Akka.IO.Tcp.Command Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint endpoint, int backlog, System.Collections.Generic.IEnumerable options, bool pullMode) { } public static Akka.IO.Tcp.Command Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint endpoint, int backlog) { } public static Akka.IO.Tcp.Command Close() { } public static Akka.IO.Tcp.Command ConfirmedClose() { } - public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress, System.Net.EndPoint localAddress, System.Collections.Generic.IEnumerable options, System.Nullable timeout, bool pullMode) { } + public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress, [System.Runtime.CompilerServices.NullableAttribute(2)] System.Net.EndPoint localAddress, System.Collections.Generic.IEnumerable options, System.Nullable timeout, bool pullMode) { } public static Akka.IO.Tcp.Command Connect(System.Net.EndPoint remoteAddress) { } public static Akka.IO.Tcp.NoAck NoAck(object token = null) { } public static Akka.IO.Tcp.Command Register(Akka.Actor.IActorRef handler, bool keepOpenOnPeerClosed = False, bool useResumeWriting = True) { } diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index a5e2cac3e13..2cc3d4236ab 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -23,28 +23,19 @@ namespace Akka.IO /// /// The set of TCP capabilities for Akka.IO are exposed via this extension. /// - public class Tcp : ExtensionIdProvider + public sealed class Tcp : ExtensionIdProvider { + // TODO: refactor this in v1.6 to use a `.For` method with the correct ExtensionId provider setup + private static readonly Tcp PluginInstance = new(); + /// - /// TBD - /// - public static readonly Tcp Instance = new(); - - /// - /// TBD + /// Fetches the TCP manager actor for the given actor system. /// - /// TBD - /// TBD public static IActorRef Manager(ActorSystem system) { - return Instance.Apply(system).Manager; + return PluginInstance.Apply(system).Manager; } - - /// - /// TBD - /// - /// TBD - /// TBD + public override TcpExt CreateExtension(ExtendedActorSystem system) { return new TcpExt(system); @@ -82,32 +73,24 @@ private SocketConnected() { } #endregion /// - /// TBD + /// Akka.IO Tcp messages are all derived from this class. /// public class Message : INoSerializationVerificationNeeded { } #region user commands // COMMANDS + /// - /// TBD + /// All Akka.IO.Tcp commands inherit from this class. /// public abstract class Command : Message { - private readonly CommandFailed _failureMessage; - /// - /// TBD + /// A predefined failure message which can be used to indicate that a command + /// failed during processing. /// - protected Command() - { - _failureMessage = new CommandFailed(this); - } - - /// - /// TBD - /// - public CommandFailed FailureMessage => _failureMessage; + public CommandFailed FailureMessage => new CommandFailed(this); } /// @@ -134,7 +117,7 @@ public Connect(EndPoint remoteAddress, { RemoteAddress = remoteAddress; LocalAddress = localAddress; - Options = options ?? Enumerable.Empty(); + Options = options ?? []; Timeout = timeout; PullMode = pullMode; } @@ -191,7 +174,7 @@ public Bind(IActorRef handler, Handler = handler; LocalAddress = localAddress; Backlog = backlog; - Options = options ?? Enumerable.Empty(); + Options = options ?? []; PullMode = pullMode; } @@ -526,63 +509,7 @@ public static Write Create(ByteString data, Event ack) return new Write(data, ack); } } - - /* - /// - /// Write `count` bytes starting at `position` from file at `filePath` to the connection. - /// The count must be > 0. The connection actor will reply with a - /// message if the write could not be enqueued. If - /// returns true, the connection actor will reply with the supplied - /// token once the write has been successfully enqueued to the O/S kernel. - /// Note that this does not in any way guarantee that the data will be - /// or have been sent! Unfortunately there is no way to determine whether - /// a particular write has been sent by the O/S. - /// - public class WriteFile : SimpleWriteCommand - { - private readonly Event _ack; - - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - public WriteFile(string filePath, long position, long count, Event ack) - { - if (position < 0) throw new ArgumentException("WriteFile.position must be >= 0", nameof(position)); - if (count <= 0) throw new ArgumentException("WriteFile.count must be > 0", nameof(count)); - - _ack = ack; - FilePath = filePath; - Position = position; - Count = count; - } - - /// - /// TBD - /// - public string FilePath { get; } - /// - /// TBD - /// - public long Position { get; } - /// - /// TBD - /// - public long Count { get; } - - /// - /// TBD - /// - public override Event Ack => _ack; - - public override string ToString() => - $"WriteFile(path: {FilePath}, position: {Position}, count: {Count}, ack: {Ack})"; - } - */ + /// /// A write command which aggregates two other write commands. Using this construct /// you can chain a number of commands together in a way @@ -750,18 +677,11 @@ public class Event : Message /// public sealed class Received : Event { - /// - /// TBD - /// - /// TBD public Received(ByteString data) { Data = data; } - - /// - /// TBD - /// + public ByteString Data { get; } public override string ToString() => @@ -776,11 +696,6 @@ public override string ToString() => /// public sealed class Connected : Event { - /// - /// TBD - /// - /// TBD - /// TBD public Connected(EndPoint remoteAddress, EndPoint localAddress) { RemoteAddress = remoteAddress; @@ -788,11 +703,12 @@ public Connected(EndPoint remoteAddress, EndPoint localAddress) } /// - /// TBD + /// The remote endpoint of the connection. /// public EndPoint RemoteAddress { get; } + /// - /// TBD + /// The local endpoint of the connection. /// public EndPoint LocalAddress { get; } @@ -806,21 +722,25 @@ public override string ToString() => /// public sealed class CommandFailed : Event { - /// - /// TBD - /// - /// TBD - public CommandFailed(Command cmd) => Cmd = cmd; + public CommandFailed(Command cmd, Option ex) + { + Cmd = cmd; + Cause = ex; + } + + public CommandFailed(Command cmd) : this(cmd, Option.None) + { + } /// - /// TBD + /// The original command which failed. /// public Command Cmd { get; } /// /// Optionally contains the cause why the command failed. /// - public Option Cause { get; private set; } = Option.None; + public Option Cause { get; private set; } /// /// Creates a copy of this object with a new cause set. @@ -847,9 +767,6 @@ public CommandFailed WithCause(Exception cause) /// public class WritingResumed : Event { - /// - /// TBD - /// public static readonly WritingResumed Instance = new(); private WritingResumed() @@ -905,29 +822,29 @@ private Unbound() public class ConnectionClosed : Event, IDeadLetterSuppression { /// - /// TBD + /// Was the connection closed normally? /// public virtual bool IsAborted => false; /// - /// TBD + /// Can we confirm that the connection was open in the first place? /// public virtual bool IsConfirmed => false; /// - /// TBD + /// Is our remote peer closed too? /// public virtual bool IsPeerClosed => false; /// - /// TBD + /// Did the connection close due to an IO error? /// public virtual bool IsErrorClosed => false; /// - /// TBD + /// Was there a given cause for why the connection was closed? /// - public virtual string Cause => null; + public virtual string? Cause => null; } /// @@ -935,9 +852,6 @@ public class ConnectionClosed : Event, IDeadLetterSuppression /// public class Closed : ConnectionClosed { - /// - /// TBD - /// public static readonly Closed Instance = new(); private Closed() @@ -950,18 +864,12 @@ private Closed() /// public class Aborted : ConnectionClosed { - /// - /// TBD - /// public static readonly Aborted Instance = new(); private Aborted() { } - - /// - /// TBD - /// + public override bool IsAborted => true; } @@ -971,18 +879,12 @@ private Aborted() /// public class ConfirmedClosed : ConnectionClosed { - /// - /// TBD - /// public static readonly ConfirmedClosed Instance = new(); private ConfirmedClosed() { } - - /// - /// TBD - /// + public override bool IsConfirmed => true; } @@ -991,18 +893,12 @@ private ConfirmedClosed() /// public class PeerClosed : ConnectionClosed { - /// - /// TBD - /// public static readonly PeerClosed Instance = new(); private PeerClosed() { } - - /// - /// TBD - /// + public override bool IsPeerClosed => true; } @@ -1011,25 +907,14 @@ private PeerClosed() /// public sealed class ErrorClosed : ConnectionClosed { - /// - /// TBD - /// - /// TBD public ErrorClosed(string cause) { Cause = cause; } - /// - /// TBD - /// public override bool IsErrorClosed => true; - - /// - /// TBD - /// - /// TBD - public override string Cause { get; } + + public override string? Cause { get; } public override string ToString() => $"ErrorClosed('{Cause}')"; @@ -1061,7 +946,7 @@ protected override void LogFailure(IActorContext context, IActorRef child, Excep } /// - /// TBD + /// Akka.IO TCP extension - provides an actor-based API for TCP socket communication. /// public sealed class TcpExt : IOExtension { @@ -1093,7 +978,7 @@ internal TcpExt(ExtendedActorSystem system, TcpSettings settings) public IBufferPool BufferPool { get; } /// - /// TBD + /// The settings used by this extension. /// public TcpSettings Settings { get; } @@ -1126,21 +1011,20 @@ private IBufferPool CreateBufferPool(ExtendedActorSystem system, Config config) } /// - /// TBD + /// Helpers for generating TCP messages. /// - public class TcpMessage + public static class TcpMessage { /// - /// TBD + /// Connect to a remote TCP endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TDB - /// TBD + /// The remote endpoint + /// An optional local endpoint address to bind to. Most users don't specify this. + /// A set of socket options. + /// An optional connect timeout. Will result in a message being returned if we exceed this value. + /// Specifies whether we're running in "pull mode" or not. public static Tcp.Command Connect(EndPoint remoteAddress, - EndPoint localAddress, + EndPoint? localAddress, IEnumerable options, TimeSpan? timeout, bool pullMode) @@ -1149,24 +1033,22 @@ public static Tcp.Command Connect(EndPoint remoteAddress, } /// - /// TBD + /// Connect to a remote TCP endpoint. /// - /// TBD - /// TBD + /// The remote endpoint public static Tcp.Command Connect(EndPoint remoteAddress) { - return Connect(remoteAddress, null, null, null, false); + return Connect(remoteAddress, null, [], null, false); } /// - /// TBD + /// Bind a TCP listener to a local endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP listener. + /// The local endpoint we are binding to. + /// TCP backlog - the number of pending connections that the queue will hold. + /// A set of socket options. + /// Specifies whether we're running in "pull mode" or not for all subsequent client connections. public static Tcp.Command Bind(IActorRef handler, EndPoint endpoint, int backlog, @@ -1177,24 +1059,22 @@ public static Tcp.Command Bind(IActorRef handler, } /// - /// TBD + /// Bind a TCP listener to a local endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP listener. + /// The local endpoint we are binding to. + /// TCP backlog - the number of pending connections that the queue will hold. public static Tcp.Command Bind(IActorRef handler, EndPoint endpoint, int backlog) { return new Tcp.Bind(handler, endpoint, backlog); } /// - /// TBD + /// Registers an actor to handle an outgoing or incoming TCP connection that has been established. /// - /// TBD + /// The actor who will be handling the TCP communication. /// TBD /// TBD - /// TBD public static Tcp.Command Register(IActorRef handler, bool keepOpenOnPeerClosed = false, bool useResumeWriting = true) { @@ -1202,36 +1082,32 @@ public static Tcp.Command Register(IActorRef handler, bool keepOpenOnPeerClosed } /// - /// TBD + /// Unbinds a previously bound TCP listener. /// - /// TBD public static Tcp.Command Unbind() { return Tcp.Unbind.Instance; } /// - /// TBD + /// Closes an open TCP connection. /// - /// TBD public static Tcp.Command Close() { return Tcp.Close.Instance; } /// - /// TBD + /// Closes a confirmed-to-have-been-previously-running TCP connection. /// - /// TBD public static Tcp.Command ConfirmedClose() { return Tcp.ConfirmedClose.Instance; } /// - /// TBD + /// Aborts a TCP connection without flushing pending writes. /// - /// TBD public static Tcp.Command Abort() { return Tcp.Abort.Instance; @@ -1297,15 +1173,14 @@ public static Tcp.Command ResumeAccepting(int batchSize) } /// - /// TBD + /// Convenience methods for using the Akka.IO.Tcp extension. /// public static class TcpExtensions { /// - /// TBD + /// Returns the -specific instance for TCP connectivity. /// - /// TBD - /// TBD + /// The current actor system. public static IActorRef Tcp(this ActorSystem system) { return IO.Tcp.Manager(system); diff --git a/src/core/Akka/IO/TcpManager.cs b/src/core/Akka/IO/TcpManager.cs index b590d8618f8..efbb6ad3604 100644 --- a/src/core/Akka/IO/TcpManager.cs +++ b/src/core/Akka/IO/TcpManager.cs @@ -53,52 +53,42 @@ namespace Akka.IO internal sealed class TcpManager : ActorBase { private readonly TcpExt _tcp; - - /// - /// TBD - /// - /// TBD + public TcpManager(TcpExt tcp) { _tcp = tcp; Context.System.EventStream.Subscribe(Self, typeof(DeadLetter)); } - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD protected override bool Receive(object message) { - var c = message as Connect; - if (c != null) - { - var commander = Sender; - Context.ActorOf(Props.Create(_tcp, commander, c).WithDeploy(Deploy.Local)); - return true; - } - var b = message as Bind; - if (b != null) - { - var commander = Sender; - Context.ActorOf(Props.Create(_tcp, commander, b).WithDeploy(Deploy.Local)); - return true; - } - var dl = message as DeadLetter; - if (dl != null) + switch (message) { - var completed = dl.Message as SocketCompleted; - if (completed != null) + case Connect c: + { + var commander = Sender; + Context.ActorOf(Props.Create(_tcp, commander, c).WithDeploy(Deploy.Local)); + return true; + } + case Bind b: + { + var commander = Sender; + Context.ActorOf(Props.Create(_tcp, commander, b).WithDeploy(Deploy.Local)); + return true; + } + case DeadLetter dl: { - //TODO: release resources? + if (dl.Message is SocketCompleted completed) + { + //TODO: release resources? + } + return true; } - return true; + default: + throw new ArgumentException($"The supplied message of type {message.GetType().Name} is invalid. Only Connect and Bind messages are supported. " + + $"If you are going to manage your connection state, you need to communicate with Tcp.Connected sender actor. " + + $"See more here: https://getakka.net/articles/networking/io.html", nameof(message)); } - throw new ArgumentException($"The supplied message of type {message.GetType().Name} is invalid. Only Connect and Bind messages are supported. " + - $"If you are going to manage your connection state, you need to communicate with Tcp.Connected sender actor. " + - $"See more here: https://getakka.net/articles/networking/io.html", nameof(message)); } } } From 35abd6ab85fd89babc00acd9ff80d576ad8f0795 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 May 2025 14:52:19 -0500 Subject: [PATCH 02/12] fixed compilation error --- src/core/Akka.Tests/IO/TcpListenerSpec.cs | 2 +- src/core/Akka/IO/Tcp.cs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Tests/IO/TcpListenerSpec.cs b/src/core/Akka.Tests/IO/TcpListenerSpec.cs index db15a98bbf7..a6b845ad47d 100644 --- a/src/core/Akka.Tests/IO/TcpListenerSpec.cs +++ b/src/core/Akka.Tests/IO/TcpListenerSpec.cs @@ -143,7 +143,7 @@ public ListenerParent(TestSetup test, bool pullMode) _listener = Context.ActorOf(Props.Create(() => new TcpListener( - Tcp.Instance.Apply(Context.System), + Tcp.For(Context.System), test._bindCommander.Ref, new Tcp.Bind( _test._handler.Ref, diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index 2cc3d4236ab..f890b8fe98d 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -36,6 +36,11 @@ public static IActorRef Manager(ActorSystem system) return PluginInstance.Apply(system).Manager; } + public static TcpExt For(ActorSystem system) + { + return PluginInstance.Apply(system); + } + public override TcpExt CreateExtension(ExtendedActorSystem system) { return new TcpExt(system); From d62723ef3574fb1f736b38c6a99cb13783d0c990 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 May 2025 14:52:42 -0500 Subject: [PATCH 03/12] made `Tcp.For` `internal` --- src/core/Akka/IO/Tcp.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index f890b8fe98d..4c310b47292 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -36,7 +36,7 @@ public static IActorRef Manager(ActorSystem system) return PluginInstance.Apply(system).Manager; } - public static TcpExt For(ActorSystem system) + internal static TcpExt For(ActorSystem system) { return PluginInstance.Apply(system); } From 7bd46af4024c3ad10e42e56e2b0d7f93a074f0e8 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 May 2025 15:10:51 -0500 Subject: [PATCH 04/12] `sealed` a ton of `Tcp` classes that can't be inherited --- ...oreAPISpec.ApproveCore.DotNet.verified.txt | 32 +- .../CoreAPISpec.ApproveCore.Net.verified.txt | 32 +- src/core/Akka/IO/Tcp.cs | 299 ++++++------------ 3 files changed, 129 insertions(+), 234 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 33c552cb821..42477cca77a 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -3988,7 +3988,7 @@ namespace Akka.IO public static readonly Akka.IO.Tcp.Abort Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class Aborted : Akka.IO.Tcp.ConnectionClosed + public sealed class Aborted : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.Aborted Instance; public override bool IsAborted { get; } @@ -4003,13 +4003,13 @@ namespace Akka.IO public bool PullMode { get; } public override string ToString() { } } - public class Bound : Akka.IO.Tcp.Event + public sealed class Bound : Akka.IO.Tcp.Event { public Bound(System.Net.EndPoint localAddress) { } public System.Net.EndPoint LocalAddress { get; } public override string ToString() { } } - public class Close : Akka.IO.Tcp.CloseCommand + public sealed class Close : Akka.IO.Tcp.CloseCommand { public static readonly Akka.IO.Tcp.Close Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } @@ -4019,7 +4019,7 @@ namespace Akka.IO protected CloseCommand() { } public abstract Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class Closed : Akka.IO.Tcp.ConnectionClosed + public sealed class Closed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.Closed Instance; } @@ -4040,7 +4040,7 @@ namespace Akka.IO [Akka.Annotations.InternalApiAttribute()] public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { } } - public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable + public sealed class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable { public CompoundWrite(Akka.IO.Tcp.SimpleWriteCommand head, Akka.IO.Tcp.WriteCommand tailCommand) { } public Akka.IO.Tcp.SimpleWriteCommand Head { get; } @@ -4048,17 +4048,17 @@ namespace Akka.IO public System.Collections.Generic.IEnumerator GetEnumerator() { } public override string ToString() { } } - public class ConfirmedClose : Akka.IO.Tcp.CloseCommand + public sealed class ConfirmedClose : Akka.IO.Tcp.CloseCommand { public static readonly Akka.IO.Tcp.ConfirmedClose Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class ConfirmedClosed : Akka.IO.Tcp.ConnectionClosed + public sealed class ConfirmedClosed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.ConfirmedClosed Instance; public override bool IsConfirmed { get; } } - public class Connect : Akka.IO.Tcp.Command + public sealed class Connect : Akka.IO.Tcp.Command { public Connect(System.Net.EndPoint remoteAddress, System.Net.EndPoint localAddress = null, System.Collections.Generic.IEnumerable options = null, System.Nullable timeout = null, bool pullMode = False) { } public System.Net.EndPoint LocalAddress { get; } @@ -4108,7 +4108,7 @@ namespace Akka.IO public object Token { get; } public override string ToString() { } } - public class PeerClosed : Akka.IO.Tcp.ConnectionClosed + public sealed class PeerClosed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.PeerClosed Instance; public override bool IsPeerClosed { get; } @@ -4127,17 +4127,17 @@ namespace Akka.IO public bool UseResumeWriting { get; } public override string ToString() { } } - public class ResumeAccepting : Akka.IO.Tcp.Command + public sealed class ResumeAccepting : Akka.IO.Tcp.Command { public ResumeAccepting(int batchSize) { } public int BatchSize { get; } public override string ToString() { } } - public class ResumeReading : Akka.IO.Tcp.Command + public sealed class ResumeReading : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.ResumeReading Instance; } - public class ResumeWriting : Akka.IO.Tcp.Command + public sealed class ResumeWriting : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.ResumeWriting Instance; } @@ -4148,7 +4148,7 @@ namespace Akka.IO public bool WantsAck { get; } public Akka.IO.Tcp.CompoundWrite Append(Akka.IO.Tcp.WriteCommand that) { } } - public class SuspendReading : Akka.IO.Tcp.Command + public sealed class SuspendReading : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.SuspendReading Instance; } @@ -4156,11 +4156,11 @@ namespace Akka.IO { public static readonly Akka.IO.Tcp.Unbind Instance; } - public class Unbound : Akka.IO.Tcp.Event + public sealed class Unbound : Akka.IO.Tcp.Event { public static readonly Akka.IO.Tcp.Unbound Instance; } - public class Write : Akka.IO.Tcp.SimpleWriteCommand + public sealed class Write : Akka.IO.Tcp.SimpleWriteCommand { public static readonly Akka.IO.Tcp.Write Empty; public override Akka.IO.Tcp.Event Ack { get; } @@ -4177,7 +4177,7 @@ namespace Akka.IO public Akka.IO.Tcp.CompoundWrite Prepend(Akka.IO.Tcp.SimpleWriteCommand other) { } public Akka.IO.Tcp.WriteCommand Prepend(System.Collections.Generic.IEnumerable writes) { } } - public class WritingResumed : Akka.IO.Tcp.Event + public sealed class WritingResumed : Akka.IO.Tcp.Event { public static readonly Akka.IO.Tcp.WritingResumed Instance; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 82946f8bc58..19c79d6e562 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -3978,7 +3978,7 @@ namespace Akka.IO public static readonly Akka.IO.Tcp.Abort Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class Aborted : Akka.IO.Tcp.ConnectionClosed + public sealed class Aborted : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.Aborted Instance; public override bool IsAborted { get; } @@ -3993,13 +3993,13 @@ namespace Akka.IO public bool PullMode { get; } public override string ToString() { } } - public class Bound : Akka.IO.Tcp.Event + public sealed class Bound : Akka.IO.Tcp.Event { public Bound(System.Net.EndPoint localAddress) { } public System.Net.EndPoint LocalAddress { get; } public override string ToString() { } } - public class Close : Akka.IO.Tcp.CloseCommand + public sealed class Close : Akka.IO.Tcp.CloseCommand { public static readonly Akka.IO.Tcp.Close Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } @@ -4009,7 +4009,7 @@ namespace Akka.IO protected CloseCommand() { } public abstract Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class Closed : Akka.IO.Tcp.ConnectionClosed + public sealed class Closed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.Closed Instance; } @@ -4030,7 +4030,7 @@ namespace Akka.IO [Akka.Annotations.InternalApiAttribute()] public Akka.IO.Tcp.CommandFailed WithCause(System.Exception cause) { } } - public class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable + public sealed class CompoundWrite : Akka.IO.Tcp.WriteCommand, System.Collections.Generic.IEnumerable, System.Collections.IEnumerable { public CompoundWrite(Akka.IO.Tcp.SimpleWriteCommand head, Akka.IO.Tcp.WriteCommand tailCommand) { } public Akka.IO.Tcp.SimpleWriteCommand Head { get; } @@ -4038,17 +4038,17 @@ namespace Akka.IO public System.Collections.Generic.IEnumerator GetEnumerator() { } public override string ToString() { } } - public class ConfirmedClose : Akka.IO.Tcp.CloseCommand + public sealed class ConfirmedClose : Akka.IO.Tcp.CloseCommand { public static readonly Akka.IO.Tcp.ConfirmedClose Instance; public override Akka.IO.Tcp.ConnectionClosed Event { get; } } - public class ConfirmedClosed : Akka.IO.Tcp.ConnectionClosed + public sealed class ConfirmedClosed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.ConfirmedClosed Instance; public override bool IsConfirmed { get; } } - public class Connect : Akka.IO.Tcp.Command + public sealed class Connect : Akka.IO.Tcp.Command { public Connect(System.Net.EndPoint remoteAddress, System.Net.EndPoint localAddress = null, System.Collections.Generic.IEnumerable options = null, System.Nullable timeout = null, bool pullMode = False) { } public System.Net.EndPoint LocalAddress { get; } @@ -4098,7 +4098,7 @@ namespace Akka.IO public object Token { get; } public override string ToString() { } } - public class PeerClosed : Akka.IO.Tcp.ConnectionClosed + public sealed class PeerClosed : Akka.IO.Tcp.ConnectionClosed { public static readonly Akka.IO.Tcp.PeerClosed Instance; public override bool IsPeerClosed { get; } @@ -4117,17 +4117,17 @@ namespace Akka.IO public bool UseResumeWriting { get; } public override string ToString() { } } - public class ResumeAccepting : Akka.IO.Tcp.Command + public sealed class ResumeAccepting : Akka.IO.Tcp.Command { public ResumeAccepting(int batchSize) { } public int BatchSize { get; } public override string ToString() { } } - public class ResumeReading : Akka.IO.Tcp.Command + public sealed class ResumeReading : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.ResumeReading Instance; } - public class ResumeWriting : Akka.IO.Tcp.Command + public sealed class ResumeWriting : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.ResumeWriting Instance; } @@ -4138,7 +4138,7 @@ namespace Akka.IO public bool WantsAck { get; } public Akka.IO.Tcp.CompoundWrite Append(Akka.IO.Tcp.WriteCommand that) { } } - public class SuspendReading : Akka.IO.Tcp.Command + public sealed class SuspendReading : Akka.IO.Tcp.Command { public static readonly Akka.IO.Tcp.SuspendReading Instance; } @@ -4146,11 +4146,11 @@ namespace Akka.IO { public static readonly Akka.IO.Tcp.Unbind Instance; } - public class Unbound : Akka.IO.Tcp.Event + public sealed class Unbound : Akka.IO.Tcp.Event { public static readonly Akka.IO.Tcp.Unbound Instance; } - public class Write : Akka.IO.Tcp.SimpleWriteCommand + public sealed class Write : Akka.IO.Tcp.SimpleWriteCommand { public static readonly Akka.IO.Tcp.Write Empty; public override Akka.IO.Tcp.Event Ack { get; } @@ -4167,7 +4167,7 @@ namespace Akka.IO public Akka.IO.Tcp.CompoundWrite Prepend(Akka.IO.Tcp.SimpleWriteCommand other) { } public Akka.IO.Tcp.WriteCommand Prepend(System.Collections.Generic.IEnumerable writes) { } } - public class WritingResumed : Akka.IO.Tcp.Event + public sealed class WritingResumed : Akka.IO.Tcp.Event { public static readonly Akka.IO.Tcp.WritingResumed Instance; } diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index 4c310b47292..a36b6d5465a 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -104,16 +104,17 @@ public abstract class Command : Message /// or the actor handling the new connection replies with a /// message. /// - public class Connect : Command + public sealed class Connect : Command { + /// - /// TBD + /// Connect to a remote TCP endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + /// The remote endpoint + /// An optional local endpoint address to bind to. Most users don't specify this. + /// A set of socket options. + /// An optional connect timeout. Will result in a message being returned if we exceed this value. + /// Specifies whether we're running in "pull mode" or not. public Connect(EndPoint remoteAddress, EndPoint localAddress = null, IEnumerable options = null, @@ -126,26 +127,14 @@ public Connect(EndPoint remoteAddress, Timeout = timeout; PullMode = pullMode; } - - /// - /// TBD - /// + public EndPoint RemoteAddress { get; } - /// - /// TBD - /// + public EndPoint LocalAddress { get; } - /// - /// TBD - /// + public IEnumerable Options { get; } - /// - /// TBD - /// + public TimeSpan? Timeout { get; } - /// - /// TBD - /// public bool PullMode { get; } public override string ToString() => @@ -163,13 +152,13 @@ public override string ToString() => public class Bind : Command { /// - /// TBD + /// Bind a TCP listener to a local endpoint. /// - /// TBD - /// TBD - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP listener. + /// The local endpoint we are binding to. + /// TCP backlog - the number of pending connections that the queue will hold. + /// A set of socket options. + /// Specifies whether we're running in "pull mode" or not. public Bind(IActorRef handler, EndPoint localAddress, int backlog = 100, @@ -183,25 +172,14 @@ public Bind(IActorRef handler, PullMode = pullMode; } - /// - /// TBD - /// public IActorRef Handler { get; } - /// - /// TBD - /// + public EndPoint LocalAddress { get; } - /// - /// TBD - /// + public int Backlog { get; } - /// - /// TBD - /// + public IEnumerable Options { get; } - /// - /// TBD - /// + public bool PullMode { get; } public override string ToString() => @@ -217,11 +195,11 @@ public override string ToString() => public class Register : Command { /// - /// TBD + /// Registers an actor to handle an outgoing or incoming TCP connection that has been established. /// - /// TBD - /// TBD - /// TBD + /// The actor who will be handling the TCP communication. + /// Keep the connection open if the peer is closed + /// Use resume / pause writing semantics once buffer gets full public Register(IActorRef handler, bool keepOpenOnPeerClosed = false, bool useResumeWriting = true) { Handler = handler; @@ -229,17 +207,11 @@ public Register(IActorRef handler, bool keepOpenOnPeerClosed = false, bool useRe UseResumeWriting = useResumeWriting; } - /// - /// TBD - /// + public IActorRef Handler { get; } - /// - /// TBD - /// + public bool KeepOpenOnPeerClosed { get; } - /// - /// TBD - /// + public bool UseResumeWriting { get; } public override string ToString() => @@ -253,9 +225,6 @@ public override string ToString() => /// public class Unbind : Command { - /// - /// TBD - /// public static readonly Unbind Instance = new(); private Unbind() @@ -268,7 +237,7 @@ private Unbind() public abstract class CloseCommand : Command, IDeadLetterSuppression { /// - /// TBD + /// The event to return in response to this command /// public abstract ConnectionClosed Event { get; } } @@ -279,20 +248,14 @@ public abstract class CloseCommand : Command, IDeadLetterSuppression /// data will both be notified once the socket is closed using a /// message. /// - public class Close : CloseCommand + public sealed class Close : CloseCommand { - /// - /// TBD - /// public static readonly Close Instance = new(); private Close() { } - - /// - /// TBD - /// + public override ConnectionClosed Event => Closed.Instance; } @@ -302,20 +265,14 @@ private Close() /// command and the registered handler for incoming data will both be notified /// once the socket is closed using a message. /// - public class ConfirmedClose : CloseCommand + public sealed class ConfirmedClose : CloseCommand { - /// - /// TBD - /// public static readonly ConfirmedClose Instance = new(); private ConfirmedClose() { } - - /// - /// TBD - /// + public override ConnectionClosed Event => ConfirmedClosed.Instance; } @@ -328,18 +285,12 @@ private ConfirmedClose() /// public class Abort : CloseCommand { - /// - /// TBD - /// public static readonly Abort Instance = new(); private Abort() { } - - /// - /// TBD - /// + public override ConnectionClosed Event => Aborted.Instance; } @@ -351,22 +302,15 @@ private Abort() /// public class NoAck : Event { - /// - /// TBD - /// public static readonly NoAck Instance = new(null); - - /// - /// TBD - /// - /// TBD + public NoAck(object token) { Token = token; } /// - /// TBD + /// A correlation id which can be used to identify a specific write operation. /// public object Token { get; } @@ -375,56 +319,44 @@ public override string ToString() => } /// - /// TBD + /// All write commands inherit from this class. /// public abstract class WriteCommand : Command { /// - /// TBD + /// Prepend another write before this one. /// - /// TBD - /// TBD + /// The other write to prepend + /// A compound write consisting of multiple byte buffers of non-contiguous memory public CompoundWrite Prepend(SimpleWriteCommand other) { return new CompoundWrite(other, this); } /// - /// TBD + /// Prepend a group of writes before this one. /// - /// TBD - /// TBD + /// The set of writes that will preceed this one. + /// A compound write consisting of multiple byte buffers of non-contiguous memory public WriteCommand Prepend(IEnumerable writes) { return writes.Reverse().Aggregate(this, (b, a) => { - var simple = a as SimpleWriteCommand; - if (simple != null) - return b.Prepend(simple); - - var compound = a as CompoundWrite; - if (compound != null) - return b.Prepend(compound); - - throw new ArgumentException("The supplied WriteCommand is invalid. Only SimpleWriteCommand and CompoundWrite WriteCommands are supported."); + return a switch + { + SimpleWriteCommand simple => b.Prepend(simple), + CompoundWrite compound => b.Prepend(compound), + _ => throw new ArgumentException( + "The supplied WriteCommand is invalid. Only SimpleWriteCommand and CompoundWrite WriteCommands are supported.") + }; }); } - - /// - /// TBD - /// - /// TBD - /// TBD + public static WriteCommand Create(IEnumerable writes) { return Write.Empty.Prepend(writes); } - - /// - /// TBD - /// - /// TBD - /// TBD + public static WriteCommand Create(params WriteCommand[] writes) { return Create((IEnumerable)writes); @@ -432,25 +364,25 @@ public static WriteCommand Create(params WriteCommand[] writes) } /// - /// TBD + /// A non-compounded write /// public abstract class SimpleWriteCommand : WriteCommand { /// - /// TBD + /// An optional acknowledgment event which will be sent to the sender of this command /// public abstract Event Ack { get; } /// - /// TBD + /// Indicates whether this message needs to be ACK'd to the handler. /// - public bool WantsAck => !(Ack is NoAck); + public bool WantsAck => Ack is not NoAck; /// - /// TBD + /// Appends a write after this one. /// - /// TBD - /// TBD + /// The next write to append. + /// A compound write of non-contiguous memory. public CompoundWrite Append(WriteCommand that) { return that.Prepend(this); @@ -467,20 +399,20 @@ public CompoundWrite Append(WriteCommand that) /// or have been sent! Unfortunately there is no way to determine whether /// a particular write has been sent by the O/S. /// - public class Write : SimpleWriteCommand + public sealed class Write : SimpleWriteCommand { /// - /// TBD + /// Write with no data and /// public static readonly Write Empty = new(ByteString.Empty, NoAck.Instance); /// - /// TBD + /// The data we are going to write. /// public ByteString Data { get; } /// - /// TBD + /// The optional acknowledgment event which will be sent to the sender of this command. /// public override Event Ack { get; } @@ -494,21 +426,19 @@ public override string ToString() => $"Write(bytes: {Data.Count}, ack: {Ack})"; /// - /// TBD + /// Creates a write from a /// - /// TBD - /// TBD + /// The data to return. public static Write Create(ByteString data) { return data.IsEmpty ? Empty : new Write(data, NoAck.Instance); } /// - /// TBD + /// Creates a write from a /// - /// TBD - /// TbD - /// TBD + /// The data to return. + /// The acknowledgement message we're receive once this write is complete. public static Write Create(ByteString data, Event ack) { return new Write(data, ack); @@ -523,26 +453,14 @@ public static Write Create(ByteString data, Event ack) /// If the sub commands contain `ack` requests they will be honored as soon as the /// respective write has been written completely. /// - public class CompoundWrite : WriteCommand, IEnumerable + public sealed class CompoundWrite : WriteCommand, IEnumerable { - private readonly SimpleWriteCommand _head; - private readonly WriteCommand _tailCommand; - - /// - /// TBD - /// - /// TBD - /// TBD public CompoundWrite(SimpleWriteCommand head, WriteCommand tailCommand) { - _head = head; - _tailCommand = tailCommand; + Head = head; + TailCommand = tailCommand; } - - /// - /// TBD - /// - /// TBD + public IEnumerator GetEnumerator() { return Enumerable().GetEnumerator(); @@ -558,31 +476,21 @@ private IEnumerable Enumerable() WriteCommand current = this; while (current != null) { - var compound = current as CompoundWrite; - if (compound != null) + if (current is CompoundWrite compound) { current = compound.TailCommand; yield return compound.Head; } - var simple = current as SimpleWriteCommand; - if (simple != null) - { - current = null; - yield return simple; - } + if (current is not SimpleWriteCommand simple) continue; + current = null; + yield return simple; } } + + public SimpleWriteCommand Head { get; } - /// - /// TBD - /// - public SimpleWriteCommand Head => _head; - - /// - /// TBD - /// - public WriteCommand TailCommand => _tailCommand; + public WriteCommand TailCommand { get; } public override string ToString() => $"CompoundWrite({Head}, {TailCommand})"; @@ -595,11 +503,8 @@ public override string ToString() => /// connection actor between the first and subsequent reception of /// this message will also be rejected with . /// - public class ResumeWriting : Command + public sealed class ResumeWriting : Command { - /// - /// TBD - /// public static readonly ResumeWriting Instance = new(); private ResumeWriting() @@ -612,11 +517,8 @@ private ResumeWriting() /// socket. TCP flow-control will then propagate backpressure to the sender side /// as buffers fill up on either end. To re-enable reading send . /// - public class SuspendReading : Command + public sealed class SuspendReading : Command { - /// - /// TBD - /// public static readonly SuspendReading Instance = new(); private SuspendReading() @@ -628,11 +530,8 @@ private SuspendReading() /// This command needs to be sent to the connection actor after a /// command in order to resume reading from the socket. /// - public class ResumeReading : Command + public sealed class ResumeReading : Command { - /// - /// TBD - /// public static readonly ResumeReading Instance = new(); private ResumeReading() @@ -644,24 +543,20 @@ private ResumeReading() /// This message enables the accepting of the next connection if read throttling is enabled /// for connection actors. /// - public class ResumeAccepting : Command + public sealed class ResumeAccepting : Command { /// - /// TBD + /// The number of connections to accept before resuming read throttling. /// public int BatchSize { get; } - - /// - /// TBD - /// - /// TBD + public ResumeAccepting(int batchSize) { BatchSize = batchSize; } public override string ToString() => - $"ResumeAccepting(batchSize: {BatchSize})"; + $"ResumeAccepting(BatchSize: {BatchSize})"; } #endregion @@ -770,7 +665,7 @@ public CommandFailed WithCause(Exception cause) /// the first message have been enqueued to the O/S kernel at this /// point. /// - public class WritingResumed : Event + public sealed class WritingResumed : Event { public static readonly WritingResumed Instance = new(); @@ -784,7 +679,7 @@ private WritingResumed() /// in this form. If the bind address indicated a 0 port number, then the contained /// `localAddress` can be used to find out which port was automatically assigned. /// - public class Bound : Event + public sealed class Bound : Event { /// /// The local listening endpoint of the bound socket. @@ -808,7 +703,7 @@ public override string ToString() => /// The sender of an command will receive confirmation through this /// message once the listening socket has been closed. /// - public class Unbound : Event + public sealed class Unbound : Event { /// /// Singleton instance @@ -855,7 +750,7 @@ public class ConnectionClosed : Event, IDeadLetterSuppression /// /// The connection has been closed normally in response to a command. /// - public class Closed : ConnectionClosed + public sealed class Closed : ConnectionClosed { public static readonly Closed Instance = new(); @@ -867,7 +762,7 @@ private Closed() /// /// The connection has been aborted in response to an command. /// - public class Aborted : ConnectionClosed + public sealed class Aborted : ConnectionClosed { public static readonly Aborted Instance = new(); @@ -882,7 +777,7 @@ private Aborted() /// The connection has been half-closed by us and then half-close by the peer /// in response to a command. /// - public class ConfirmedClosed : ConnectionClosed + public sealed class ConfirmedClosed : ConnectionClosed { public static readonly ConfirmedClosed Instance = new(); @@ -896,7 +791,7 @@ private ConfirmedClosed() /// /// The peer has closed its writing half of the connection. /// - public class PeerClosed : ConnectionClosed + public sealed class PeerClosed : ConnectionClosed { public static readonly PeerClosed Instance = new(); @@ -927,7 +822,7 @@ public override string ToString() => #endregion - private class ConnectionSupervisorStrategyImp : OneForOneStrategy + private sealed class ConnectionSupervisorStrategyImp : OneForOneStrategy { public ConnectionSupervisorStrategyImp() : base(StoppingStrategy.Decider) @@ -1078,8 +973,8 @@ public static Tcp.Command Bind(IActorRef handler, EndPoint endpoint, int backlog /// Registers an actor to handle an outgoing or incoming TCP connection that has been established. /// /// The actor who will be handling the TCP communication. - /// TBD - /// TBD + /// Keep the connection open if the peer is closed + /// Use resume / pause writing semantics once buffer gets full public static Tcp.Command Register(IActorRef handler, bool keepOpenOnPeerClosed = false, bool useResumeWriting = true) { From 24e2bc3bbf10645ecdf85b3ddc8126cbbfc9cd93 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 May 2025 15:16:17 -0500 Subject: [PATCH 05/12] remove pointless `DeadLetter` subscription --- src/core/Akka/IO/TcpManager.cs | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/core/Akka/IO/TcpManager.cs b/src/core/Akka/IO/TcpManager.cs index efbb6ad3604..2c1189008ff 100644 --- a/src/core/Akka/IO/TcpManager.cs +++ b/src/core/Akka/IO/TcpManager.cs @@ -57,7 +57,6 @@ internal sealed class TcpManager : ActorBase public TcpManager(TcpExt tcp) { _tcp = tcp; - Context.System.EventStream.Subscribe(Self, typeof(DeadLetter)); } protected override bool Receive(object message) @@ -76,14 +75,6 @@ protected override bool Receive(object message) Context.ActorOf(Props.Create(_tcp, commander, b).WithDeploy(Deploy.Local)); return true; } - case DeadLetter dl: - { - if (dl.Message is SocketCompleted completed) - { - //TODO: release resources? - } - return true; - } default: throw new ArgumentException($"The supplied message of type {message.GetType().Name} is invalid. Only Connect and Bind messages are supported. " + $"If you are going to manage your connection state, you need to communicate with Tcp.Connected sender actor. " + From 03dd2a4cd766646906f1bec828f2586690c567c2 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Tue, 6 May 2025 16:22:42 -0500 Subject: [PATCH 06/12] Akka.IO: ensure `TcpListener` doesn't do weird stuff with SAEA close #5988 --- src/core/Akka/IO/TcpListener.cs | 209 +++++++++++++++++++------------- 1 file changed, 127 insertions(+), 82 deletions(-) diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index b3db55feea7..ca61644da9e 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -18,89 +18,58 @@ namespace Akka.IO { - class TcpListener : ActorBase, IRequiresMessageQueue + /// + /// INTERNAL API + /// + /// TcpListener is an internal actor that binds to a local address and listens for incoming TCP connections. + /// + internal sealed class TcpListener : ActorBase, IRequiresMessageQueue { + /// + /// In the event that someone specified something stupid like 0 or a negative number + /// for the accept limit, this is a reasonable default. + /// + public static readonly int DefaultAcceptLimit = Environment.ProcessorCount * 2; + private readonly TcpExt _tcp; - private readonly IActorRef _bindCommander; + private readonly IActorRef _bindCommander; // forwarded destination for Connected private Tcp.Bind _bind; private Socket _socket; private readonly ILoggingAdapter _log = Context.GetLogger(); private int _acceptLimit; - private SocketAsyncEventArgs[] _saeas; + private SocketAsyncEventArgs[]? _acceptPool; private bool _binding; - /// - /// TBD - /// - /// TBD - /// TBD - /// TBD + private sealed record AcceptCompleted(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; + + private sealed record RetryAccept(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; + public TcpListener(TcpExt tcp, IActorRef bindCommander, Tcp.Bind bind) { _tcp = tcp; _bindCommander = bindCommander; - Become(Initializing()); Self.Tell(bind); } - private Receive Initializing() => message => + private Receive Bound() => message => { switch (message) { - case Tcp.Bind bind: - if (_binding) - { - _log.Warning("Already trying to bind to TCP channel on endpoint [{0}]", _bind.LocalAddress); - return true; - } - _binding = true; - _bind = bind; - _acceptLimit = bind.PullMode ? 0 : _tcp.Settings.BatchAcceptLimit; - BindAsync().PipeTo(Self); - return true; - - case Status.Failure fail: - _bindCommander.Tell(_bind.FailureMessage.WithCause(fail.Cause)); - _log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress); - Context.Stop(Self); - _binding = false; - return true; - - case Tcp.Bound bound: - Context.Watch(_bind.Handler); - _bindCommander.Tell(bound); - Become(Bound()); - _binding = false; + case AcceptCompleted accepted: + HandleAccept(accepted.EventArgs); return true; - default: - return false; - } - }; - - private Receive Bound() => message => - { - switch (message) - { - case SocketEvent evt: - var saea = evt.Args; - if (saea.SocketError == SocketError.Success) - Context.ActorOf(Props.Create(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local)); - - saea.AcceptSocket = null; - if (!_socket.AcceptAsync(saea)) - Self.Tell(new SocketEvent(saea)); + case RetryAccept retry: + StartAccept(retry.EventArgs); return true; - case Tcp.ResumeAccepting resumeAccepting: - _acceptLimit = resumeAccepting.BatchSize; - // TODO: this is dangerous, previous async args are not disposed and there's no guarantee that they're not still receiving data - _saeas = Accept(_acceptLimit).ToArray(); + case Tcp.ResumeAccepting: + // NO-OP - this is obsolete return true; - case Tcp.Unbind _: + case Tcp.Unbind: Become(Unbinding(Sender)); UnbindAsync().PipeTo(Self); return true; @@ -130,16 +99,48 @@ private Receive Unbinding(IActorRef requester) => message => } }; - private IEnumerable Accept(int limit) + private void StartAccept(SocketAsyncEventArgs saea) { - for(var i = 0; i < limit; i++) + + var pending = _socket.AcceptAsync(saea); + if (!pending) + Self.Tell(new AcceptCompleted(saea), Self); // synchronous completion ➔ mailbox + } + + // closure + private IActorRef _safeSelf = Context.Self; + + private void OnCompleted(object? sender, SocketAsyncEventArgs saea) + { + // Marshall back into the actor context – keeps user code off the IOCP thread. + _safeSelf.Tell(new AcceptCompleted(saea), ActorRefs.NoSender); + } + + private void HandleAccept(SocketAsyncEventArgs saea) + { + switch (saea.SocketError) { - var self = Self; - var saea = new SocketAsyncEventArgs(); - saea.Completed += (_, e) => self.Tell(new SocketEvent(e)); - if (!_socket.AcceptAsync(saea)) - Self.Tell(new SocketEvent(saea)); - yield return saea; + case SocketError.Success: + var accepted = saea.AcceptSocket!; + saea.AcceptSocket = null; // ready for re‑use + Context.ActorOf(Props.Create(_tcp, accepted, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local)); + StartAccept(saea); // keep the pool full + break; + + case SocketError.ConnectionReset: + case SocketError.NoBufferSpaceAvailable: + case SocketError.TryAgain: + case SocketError.TimedOut: + case SocketError.WouldBlock: + // transient – short back‑off then retry + saea.AcceptSocket = null; + Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(10), Self, + new RetryAccept(saea), ActorRefs.NoSender); + break; + default: + _log.Error("Fatal socket error in TcpListener: {0}", saea.SocketError); + Context.Stop(Self); + break; } } @@ -155,7 +156,18 @@ private IEnumerable Accept(int limit) _bind.Options.ForEach(x => x.BeforeServerSocketBind(_socket)); _socket.Bind(_bind.LocalAddress); _socket.Listen(_bind.Backlog); - _saeas = Accept(_acceptLimit).ToArray(); + + _acceptPool = new SocketAsyncEventArgs[_acceptLimit]; + for (var i = 0; i < _acceptPool.Length; i++) + { + var saea = new SocketAsyncEventArgs(); + saea.Completed += OnCompleted; // IOCP -> this actor + _acceptPool[i] = saea; + } + + // start accepting connections + foreach (var saea in _acceptPool) + StartAccept(saea); return Task.FromResult(new Tcp.Bound(_socket.LocalEndPoint)); } @@ -186,33 +198,66 @@ protected override SupervisorStrategy SupervisorStrategy() protected override bool Receive(object message) { - throw new NotImplementedException(); + switch (message) + { + case Tcp.Bind bind: + if (_binding) + { + _log.Warning("Already trying to bind to TCP channel on endpoint [{0}]", _bind.LocalAddress); + return true; + } + _binding = true; + _bind = bind; + // not going to do pull mode + _acceptLimit = _tcp.Settings.BatchAcceptLimit; + if (_acceptLimit <= 0) + { + _log.Debug("Accept limit was set to {0}, which is unworkable. Using default value of {1} (logical CPUs * 2)", _acceptLimit, DefaultAcceptLimit); + _acceptLimit = DefaultAcceptLimit; + } + + + _log.Info("Binding TCP channel on endpoint [{0}]", _bind.LocalAddress); + + BindAsync().PipeTo(Self); + return true; + + case Status.Failure fail: + _bindCommander.Tell(_bind.FailureMessage.WithCause(fail.Cause)); + _log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress); + Context.Stop(Self); + _binding = false; + return true; + + case Tcp.Bound bound: + Context.Watch(_bind.Handler); + _bindCommander.Tell(bound); + Become(Bound()); + _binding = false; + return true; + + default: + return false; + } } - - /// - /// TBD - /// + protected override void PostStop() { try { + if(_acceptPool != null) + foreach(var saea in _acceptPool) + { + // remove event handler + saea.Completed -= OnCompleted; + saea.Dispose(); + } _socket?.Dispose(); - _saeas?.ForEach(x => x.Dispose()); } catch (Exception e) { _log.Debug("Error closing ServerSocketChannel: {0}", e); } } - - private readonly struct SocketEvent : INoSerializationVerificationNeeded - { - public readonly SocketAsyncEventArgs Args; - - public SocketEvent(SocketAsyncEventArgs args) - { - Args = args; - } - } } } From d16bb311712d04fec09b6d4f67d969986aa1839d Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 7 May 2025 10:20:50 -0500 Subject: [PATCH 07/12] fixed typo --- src/core/Akka/IO/TcpSettings.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/Akka/IO/TcpSettings.cs b/src/core/Akka/IO/TcpSettings.cs index 0847e49f1c5..88f9cbc0a0e 100644 --- a/src/core/Akka/IO/TcpSettings.cs +++ b/src/core/Akka/IO/TcpSettings.cs @@ -88,7 +88,7 @@ public TcpSettings( string bufferPoolConfigPath, /// /// A config path to the section defining which byte buffer pool to use. - /// Buffer pools are used to mitigate GC-pressure made by potentiall allocation + /// Buffer pools are used to mitigate GC-pressure made by potential allocation /// and deallocation of byte buffers used for writing/receiving data from sockets. /// public string BufferPoolConfigPath { get; } From eb63f8cb811a6394c6df3b4fd233360160fd5a15 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 7 May 2025 11:49:58 -0500 Subject: [PATCH 08/12] tweak `Tcp` handling code --- ...oreAPISpec.ApproveCore.DotNet.verified.txt | 3 +- .../CoreAPISpec.ApproveCore.Net.verified.txt | 3 +- src/core/Akka/IO/Tcp.cs | 4 +- src/core/Akka/IO/TcpListener.cs | 122 +++++++++++------- src/core/Akka/IO/TcpSettings.cs | 3 +- 5 files changed, 83 insertions(+), 52 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 42477cca77a..427f0be5b53 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -3995,7 +3995,7 @@ namespace Akka.IO } public class Bind : Akka.IO.Tcp.Command { - public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 100, System.Collections.Generic.IEnumerable options = null, bool pullMode = False) { } + public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 1024, System.Collections.Generic.IEnumerable options = null, bool pullMode = False) { } public int Backlog { get; } public Akka.Actor.IActorRef Handler { get; } public System.Net.EndPoint LocalAddress { get; } @@ -4214,6 +4214,7 @@ namespace Akka.IO public class TcpSettings { public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, int batchAcceptLimit, System.Nullable registerTimeout, int receivedMessageSizeLimit, string managementDispatcher, string fileIoDispatcher, int transferToLimit, int finishConnectRetries, bool outgoingSocketForceIpv4, int writeCommandsQueueMaxSize) { } + [System.ObsoleteAttribute("This setting is deprecated and will be removed in a future version.")] public int BatchAcceptLimit { get; } public string BufferPoolConfigPath { get; } public string FileIODispatcher { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 19c79d6e562..b1487755e3b 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -3985,7 +3985,7 @@ namespace Akka.IO } public class Bind : Akka.IO.Tcp.Command { - public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 100, System.Collections.Generic.IEnumerable options = null, bool pullMode = False) { } + public Bind(Akka.Actor.IActorRef handler, System.Net.EndPoint localAddress, int backlog = 1024, System.Collections.Generic.IEnumerable options = null, bool pullMode = False) { } public int Backlog { get; } public Akka.Actor.IActorRef Handler { get; } public System.Net.EndPoint LocalAddress { get; } @@ -4204,6 +4204,7 @@ namespace Akka.IO public class TcpSettings { public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, int batchAcceptLimit, System.Nullable registerTimeout, int receivedMessageSizeLimit, string managementDispatcher, string fileIoDispatcher, int transferToLimit, int finishConnectRetries, bool outgoingSocketForceIpv4, int writeCommandsQueueMaxSize) { } + [System.ObsoleteAttribute("This setting is deprecated and will be removed in a future version.")] public int BatchAcceptLimit { get; } public string BufferPoolConfigPath { get; } public string FileIODispatcher { get; } diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index a36b6d5465a..1bb8a69f522 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -156,12 +156,12 @@ public class Bind : Command /// /// The actor who will be handling the TCP listener. /// The local endpoint we are binding to. - /// TCP backlog - the number of pending connections that the queue will hold. + /// TCP backlog - the number of pending connections that the queue will hold. Defaults to 1024. /// A set of socket options. /// Specifies whether we're running in "pull mode" or not. public Bind(IActorRef handler, EndPoint localAddress, - int backlog = 100, + int backlog = 1024, IEnumerable options = null, bool pullMode = false) { diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index ca61644da9e..87950cb6cff 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -6,18 +6,33 @@ //----------------------------------------------------------------------- using System; -using System.Net; using System.Net.Sockets; using Akka.Actor; using Akka.Dispatch; using Akka.Event; using Akka.Util.Internal; -using System.Collections.Generic; -using System.Linq; using System.Threading.Tasks; namespace Akka.IO { + /// + /// SocketAsyncEventArgs is a wrapper around SocketAsyncEventArgs that allows us to deliver + /// notifications to actors upon completion of the operation. + /// + internal sealed class SocketAsyncActorEventArgs : SocketAsyncEventArgs + { + public SocketAsyncActorEventArgs(IActorRef notifyMe, EventHandler onCompleted) + { + NotifyMe = notifyMe; + Completed += onCompleted; + } + + /// + /// The actor we're going to notify once the operation is completed. + /// + public IActorRef NotifyMe { get; } + } + /// /// INTERNAL API /// @@ -30,26 +45,27 @@ internal sealed class TcpListener : ActorBase, IRequiresMessageQueue public static readonly int DefaultAcceptLimit = Environment.ProcessorCount * 2; - + private readonly TcpExt _tcp; - private readonly IActorRef _bindCommander; // forwarded destination for Connected + private readonly IActorRef _bindCommander; // forwarded destination for Connected private Tcp.Bind _bind; private Socket _socket; private readonly ILoggingAdapter _log = Context.GetLogger(); - private int _acceptLimit; - private SocketAsyncEventArgs[]? _acceptPool; + private readonly int _acceptLimit = DefaultAcceptLimit; + private SocketAsyncActorEventArgs[]? _acceptPool; private bool _binding; + private static readonly EventHandler OnCompleted = OnIoCompleted; private sealed record AcceptCompleted(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; private sealed record RetryAccept(SocketAsyncEventArgs EventArgs) : INoSerializationVerificationNeeded; - + public TcpListener(TcpExt tcp, IActorRef bindCommander, Tcp.Bind bind) { _tcp = tcp; _bindCommander = bindCommander; - + Self.Tell(bind); } @@ -60,7 +76,7 @@ private Receive Bound() => message => case AcceptCompleted accepted: HandleAccept(accepted.EventArgs); return true; - + case RetryAccept retry: StartAccept(retry.EventArgs); return true; @@ -74,6 +90,10 @@ private Receive Bound() => message => UnbindAsync().PipeTo(Self); return true; + case Status.Failure failure: + _log.Error(failure.Cause, "Received SocketAsyncEventArgs failure"); + return true; + default: return false; } @@ -88,43 +108,58 @@ private Receive Unbinding(IActorRef requester) => message => _log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress); Context.Stop(Self); return true; - + case Status.Failure fail: _log.Error(fail.Cause, "Failed to unbind TCP listener for address [{0}]", _bind.LocalAddress); Context.Stop(Self); return true; - + default: return false; } }; - + private void StartAccept(SocketAsyncEventArgs saea) { - var pending = _socket.AcceptAsync(saea); if (!pending) Self.Tell(new AcceptCompleted(saea), Self); // synchronous completion ➔ mailbox } - // closure - private IActorRef _safeSelf = Context.Self; - - private void OnCompleted(object? sender, SocketAsyncEventArgs saea) + private static void OnIoCompleted(object? sender, SocketAsyncEventArgs saea) { // Marshall back into the actor context – keeps user code off the IOCP thread. - _safeSelf.Tell(new AcceptCompleted(saea), ActorRefs.NoSender); + var actorArgs = (SocketAsyncActorEventArgs)saea; + + var actor = actorArgs.NotifyMe; + if (actorArgs.LastOperation == SocketAsyncOperation.Accept) + { + actor.Tell(new AcceptCompleted(saea), actor); + } + else // should never happen + { + // This should never happen, but just in case. + var ioe = new InvalidOperationException( + $"SocketAsyncEventArgs last operation is not Accept: {actorArgs.LastOperation}"); + actor.Tell(new Status.Failure(ioe), actor); + + // retry the operation + actorArgs.AcceptSocket = null; + actor.Tell(new RetryAccept(actorArgs)); + } } - + private void HandleAccept(SocketAsyncEventArgs saea) { switch (saea.SocketError) { case SocketError.Success: var accepted = saea.AcceptSocket!; - saea.AcceptSocket = null; // ready for re‑use - Context.ActorOf(Props.Create(_tcp, accepted, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local)); - StartAccept(saea); // keep the pool full + saea.AcceptSocket = null; // ready for re‑use + Context.ActorOf(Props + .Create(_tcp, accepted, _bind.Handler, _bind.Options, _bind.PullMode) + .WithDeploy(Deploy.Local)); + StartAccept(saea); // keep the pool full break; case SocketError.ConnectionReset: @@ -156,15 +191,14 @@ private void HandleAccept(SocketAsyncEventArgs saea) _bind.Options.ForEach(x => x.BeforeServerSocketBind(_socket)); _socket.Bind(_bind.LocalAddress); _socket.Listen(_bind.Backlog); - - _acceptPool = new SocketAsyncEventArgs[_acceptLimit]; + + _acceptPool = new SocketAsyncActorEventArgs[_acceptLimit]; for (var i = 0; i < _acceptPool.Length; i++) { - var saea = new SocketAsyncEventArgs(); - saea.Completed += OnCompleted; // IOCP -> this actor + var saea = new SocketAsyncActorEventArgs(Self, OnCompleted); _acceptPool[i] = saea; } - + // start accepting connections foreach (var saea in _acceptPool) StartAccept(saea); @@ -190,7 +224,7 @@ private void HandleAccept(SocketAsyncEventArgs saea) return Task.FromException(ex); } } - + protected override SupervisorStrategy SupervisorStrategy() { return Tcp.ConnectionSupervisorStrategy; @@ -206,52 +240,46 @@ protected override bool Receive(object message) _log.Warning("Already trying to bind to TCP channel on endpoint [{0}]", _bind.LocalAddress); return true; } + _binding = true; _bind = bind; - // not going to do pull mode - _acceptLimit = _tcp.Settings.BatchAcceptLimit; - if (_acceptLimit <= 0) - { - _log.Debug("Accept limit was set to {0}, which is unworkable. Using default value of {1} (logical CPUs * 2)", _acceptLimit, DefaultAcceptLimit); - _acceptLimit = DefaultAcceptLimit; - } - - - _log.Info("Binding TCP channel on endpoint [{0}]", _bind.LocalAddress); - + + _log.Info("Binding TCP channel on endpoint [{0}]", _bind.LocalAddress); + BindAsync().PipeTo(Self); return true; - + case Status.Failure fail: _bindCommander.Tell(_bind.FailureMessage.WithCause(fail.Cause)); _log.Error(fail.Cause, "Bind failed for TCP channel on endpoint [{0}]", _bind.LocalAddress); Context.Stop(Self); _binding = false; return true; - + case Tcp.Bound bound: Context.Watch(_bind.Handler); _bindCommander.Tell(bound); Become(Bound()); _binding = false; return true; - + default: return false; } } - + protected override void PostStop() { try { - if(_acceptPool != null) - foreach(var saea in _acceptPool) + if (_acceptPool != null) + foreach (var saea in _acceptPool) { // remove event handler saea.Completed -= OnCompleted; saea.Dispose(); } + _socket?.Dispose(); } catch (Exception e) @@ -260,4 +288,4 @@ protected override void PostStop() } } } -} +} \ No newline at end of file diff --git a/src/core/Akka/IO/TcpSettings.cs b/src/core/Akka/IO/TcpSettings.cs index 88f9cbc0a0e..5613cc978bb 100644 --- a/src/core/Akka/IO/TcpSettings.cs +++ b/src/core/Akka/IO/TcpSettings.cs @@ -75,7 +75,7 @@ public TcpSettings( string bufferPoolConfigPath, BufferPoolConfigPath = bufferPoolConfigPath; InitialSocketAsyncEventArgs = initialSocketAsyncEventArgs; TraceLogging = traceLogging; - BatchAcceptLimit = batchAcceptLimit; + // BatchAcceptLimit = batchAcceptLimit; // not used, remove this setting in v1.6 RegisterTimeout = registerTimeout; ReceivedMessageSizeLimit = receivedMessageSizeLimit; ManagementDispatcher = managementDispatcher; @@ -111,6 +111,7 @@ public TcpSettings( string bufferPoolConfigPath, /// numbers decrease latency, lower numbers increase fairness on the /// worker-dispatcher /// + [Obsolete("This setting is deprecated and will be removed in a future version.")] public int BatchAcceptLimit { get; } /// From 4d2913d9af9f84e307f73d068ce38b180d34dc1b Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 7 May 2025 14:21:10 -0500 Subject: [PATCH 09/12] added updated API approvals --- .../verify/CoreAPISpec.ApproveCore.DotNet.verified.txt | 1 - .../verify/CoreAPISpec.ApproveCore.Net.verified.txt | 1 - 2 files changed, 2 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index 54a51498c07..dbed03673ec 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -4089,7 +4089,6 @@ namespace Akka.IO public sealed class ErrorClosed : Akka.IO.Tcp.ConnectionClosed { public ErrorClosed(string cause) { } - [System.Runtime.CompilerServices.NullableAttribute(2)] public override string Cause { get; } public override bool IsErrorClosed { get; } public override string ToString() { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 9359417d8b3..84fa74b8100 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -4079,7 +4079,6 @@ namespace Akka.IO public sealed class ErrorClosed : Akka.IO.Tcp.ConnectionClosed { public ErrorClosed(string cause) { } - [System.Runtime.CompilerServices.NullableAttribute(2)] public override string Cause { get; } public override bool IsErrorClosed { get; } public override string ToString() { } From eb8db4291c1f7b27571cca3d573d4b30d4f59110 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 7 May 2025 14:51:57 -0500 Subject: [PATCH 10/12] restoring ability to the `akka.io.tcp.batch-accept-limit` to be configurable --- src/core/Akka.Tests/IO/TcpListenerSpec.cs | 10 ++--- src/core/Akka/Configuration/akka.conf | 7 +++- src/core/Akka/IO/TcpListener.cs | 19 +++++---- src/core/Akka/IO/TcpSettings.cs | 48 ++++++++++++++--------- 4 files changed, 49 insertions(+), 35 deletions(-) diff --git a/src/core/Akka.Tests/IO/TcpListenerSpec.cs b/src/core/Akka.Tests/IO/TcpListenerSpec.cs index a6b845ad47d..e737eeddb11 100644 --- a/src/core/Akka.Tests/IO/TcpListenerSpec.cs +++ b/src/core/Akka.Tests/IO/TcpListenerSpec.cs @@ -75,7 +75,6 @@ class TestSetup private readonly IActorRef _handlerRef; private readonly TestProbe _bindCommander; private readonly TestProbe _parent; - private readonly TestProbe _selectorRouter; private readonly TestActorRef _parentRef; public TestSetup(TestKitBase kit, bool pullMode) @@ -87,7 +86,7 @@ public TestSetup(TestKitBase kit, bool pullMode) _handlerRef = _handler.Ref; _bindCommander = kit.CreateTestProbe(); _parent = kit.CreateTestProbe(); - _selectorRouter = kit.CreateTestProbe(); + SelectorRouter = kit.CreateTestProbe(); _parentRef = new TestActorRef(kit.Sys, Props.Create(() => new ListenerParent(this, pullMode))); } @@ -115,10 +114,7 @@ public async Task AttemptConnectionToEndpoint() public IActorRef Listener { get { return _parentRef.UnderlyingActor.Listener; } } - public TestProbe SelectorRouter - { - get { return _selectorRouter; } - } + public TestProbe SelectorRouter { get; } public TestProbe BindCommander { get { return _bindCommander; } } public TestProbe Parent { get { return _parent; } } @@ -128,7 +124,7 @@ public TestProbe SelectorRouter internal void AfterBind(Socket socket) => LocalEndPoint = (IPEndPoint)socket.LocalEndPoint; - class ListenerParent : ActorBase + private class ListenerParent : ActorBase { private readonly TestSetup _test; private readonly bool _pullMode; diff --git a/src/core/Akka/Configuration/akka.conf b/src/core/Akka/Configuration/akka.conf index 1a11eb86a71..602d7c8d4ef 100644 --- a/src/core/Akka/Configuration/akka.conf +++ b/src/core/Akka/Configuration/akka.conf @@ -763,8 +763,11 @@ akka { # The maximum number of connection that are accepted in one go, # higher numbers decrease latency, lower numbers increase fairness on - # the worker-dispatcher - batch-accept-limit = 10 + # the worker-dispatcher. + # + # By default we scale to logical CPUs * 2, but you can set this to an + # integer value greater than 0. + batch-accept-limit = scale-to-cpus # The duration a connection actor waits for a `Register` message from # its commander before aborting the connection. diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index 87950cb6cff..a7312f2d88e 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -40,18 +40,12 @@ public SocketAsyncActorEventArgs(IActorRef notifyMe, EventHandler internal sealed class TcpListener : ActorBase, IRequiresMessageQueue { - /// - /// In the event that someone specified something stupid like 0 or a negative number - /// for the accept limit, this is a reasonable default. - /// - public static readonly int DefaultAcceptLimit = Environment.ProcessorCount * 2; - private readonly TcpExt _tcp; private readonly IActorRef _bindCommander; // forwarded destination for Connected private Tcp.Bind _bind; private Socket _socket; private readonly ILoggingAdapter _log = Context.GetLogger(); - private readonly int _acceptLimit = DefaultAcceptLimit; + private readonly int _acceptLimit; private SocketAsyncActorEventArgs[]? _acceptPool; private bool _binding; private static readonly EventHandler OnCompleted = OnIoCompleted; @@ -64,6 +58,17 @@ public TcpListener(TcpExt tcp, IActorRef bindCommander, Tcp.Bind bind) { _tcp = tcp; + _acceptLimit = tcp.Settings.BatchAcceptLimit; + + if (_acceptLimit <= 0) + { + _log.Warning("Batch accept limit is set to {0}, which is less than or equal to 0. " + + "This value will HANG the listener.", _acceptLimit);; + + _acceptLimit = TcpSettings.DefaultAcceptLimit; + _log.Warning("Using default value of {0} for batch accept limit", _acceptLimit); + } + _bindCommander = bindCommander; Self.Tell(bind); diff --git a/src/core/Akka/IO/TcpSettings.cs b/src/core/Akka/IO/TcpSettings.cs index 5613cc978bb..33c19e0316c 100644 --- a/src/core/Akka/IO/TcpSettings.cs +++ b/src/core/Akka/IO/TcpSettings.cs @@ -25,7 +25,11 @@ public static TcpSettings Create(ActorSystem system) { var config = system.Settings.Config.GetConfig("akka.io.tcp"); if (config.IsNullOrEmpty()) - throw ConfigurationException.NullOrEmptyConfig("akka.io.tcp");//($"Failed to create {typeof(TcpSettings)}: akka.io.tcp configuration node not found"); + throw + ConfigurationException + .NullOrEmptyConfig< + TcpSettings>( + "akka.io.tcp"); //($"Failed to create {typeof(TcpSettings)}: akka.io.tcp configuration node not found"); return Create(config); } @@ -44,7 +48,9 @@ public static TcpSettings Create(Config config) bufferPoolConfigPath: config.GetString("buffer-pool", "akka.io.tcp.disabled-buffer-pool"), initialSocketAsyncEventArgs: config.GetInt("nr-of-socket-async-event-args", 32), traceLogging: config.GetBoolean("trace-logging", false), - batchAcceptLimit: config.GetInt("batch-accept-limit", 10), + batchAcceptLimit: config.GetString("batch-accept-limit") == "scale-to-cpus" + ? DefaultAcceptLimit + : config.GetInt("batch-accept-limit", DefaultAcceptLimit), registerTimeout: config.GetTimeSpan("register-timeout", TimeSpan.FromSeconds(5)), receivedMessageSizeLimit: config.GetString("max-received-message-size", "unlimited") == "unlimited" ? int.MaxValue @@ -59,23 +65,28 @@ public static TcpSettings Create(Config config) writeCommandsQueueMaxSize: config.GetInt("write-commands-queue-max-size", -1)); } - public TcpSettings( string bufferPoolConfigPath, - int initialSocketAsyncEventArgs, - bool traceLogging, - int batchAcceptLimit, - TimeSpan? registerTimeout, - int receivedMessageSizeLimit, - string managementDispatcher, - string fileIoDispatcher, - int transferToLimit, - int finishConnectRetries, - bool outgoingSocketForceIpv4, - int writeCommandsQueueMaxSize) + /// + /// Default size of the SAEA pool + /// + internal static readonly int DefaultAcceptLimit = Environment.ProcessorCount * 2; + + public TcpSettings(string bufferPoolConfigPath, + int initialSocketAsyncEventArgs, + bool traceLogging, + int batchAcceptLimit, + TimeSpan? registerTimeout, + int receivedMessageSizeLimit, + string managementDispatcher, + string fileIoDispatcher, + int transferToLimit, + int finishConnectRetries, + bool outgoingSocketForceIpv4, + int writeCommandsQueueMaxSize) { BufferPoolConfigPath = bufferPoolConfigPath; InitialSocketAsyncEventArgs = initialSocketAsyncEventArgs; TraceLogging = traceLogging; - // BatchAcceptLimit = batchAcceptLimit; // not used, remove this setting in v1.6 + BatchAcceptLimit = batchAcceptLimit; RegisterTimeout = registerTimeout; ReceivedMessageSizeLimit = receivedMessageSizeLimit; ManagementDispatcher = managementDispatcher; @@ -111,9 +122,8 @@ public TcpSettings( string bufferPoolConfigPath, /// numbers decrease latency, lower numbers increase fairness on the /// worker-dispatcher /// - [Obsolete("This setting is deprecated and will be removed in a future version.")] public int BatchAcceptLimit { get; } - + /// /// The duration a connection actor waits for a `Register` message from /// its commander before aborting the connection. @@ -166,7 +176,7 @@ public TcpSettings( string bufferPoolConfigPath, /// in cases when DnsEndPoint is used to describe the remote address /// public bool OutgoingSocketForceIpv4 { get; } - + /// /// Limits maximum size of internal queue, used in connection actor /// to store pending write commands. @@ -174,4 +184,4 @@ public TcpSettings( string bufferPoolConfigPath, /// public int WriteCommandsQueueMaxSize { get; } } -} +} \ No newline at end of file From ff361331d7ddd32d8dab1a7e807957ec2f93eb45 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 7 May 2025 14:55:18 -0500 Subject: [PATCH 11/12] added API approvals --- .../verify/CoreAPISpec.ApproveCore.DotNet.verified.txt | 1 - .../verify/CoreAPISpec.ApproveCore.Net.verified.txt | 1 - 2 files changed, 2 deletions(-) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt index dbed03673ec..9f58233cba3 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.DotNet.verified.txt @@ -4214,7 +4214,6 @@ namespace Akka.IO public class TcpSettings { public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, int batchAcceptLimit, System.Nullable registerTimeout, int receivedMessageSizeLimit, string managementDispatcher, string fileIoDispatcher, int transferToLimit, int finishConnectRetries, bool outgoingSocketForceIpv4, int writeCommandsQueueMaxSize) { } - [System.ObsoleteAttribute("This setting is deprecated and will be removed in a future version.")] public int BatchAcceptLimit { get; } public string BufferPoolConfigPath { get; } public string FileIODispatcher { get; } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt index 84fa74b8100..605bc1f5525 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveCore.Net.verified.txt @@ -4204,7 +4204,6 @@ namespace Akka.IO public class TcpSettings { public TcpSettings(string bufferPoolConfigPath, int initialSocketAsyncEventArgs, bool traceLogging, int batchAcceptLimit, System.Nullable registerTimeout, int receivedMessageSizeLimit, string managementDispatcher, string fileIoDispatcher, int transferToLimit, int finishConnectRetries, bool outgoingSocketForceIpv4, int writeCommandsQueueMaxSize) { } - [System.ObsoleteAttribute("This setting is deprecated and will be removed in a future version.")] public int BatchAcceptLimit { get; } public string BufferPoolConfigPath { get; } public string FileIODispatcher { get; } From 4c31bc553a1d5cf615fff168a521b9c306931872 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Wed, 7 May 2025 15:02:31 -0500 Subject: [PATCH 12/12] added specs to validate `TcpSettings` --- src/core/Akka.Tests/IO/TcpSettingsSpec.cs | 42 +++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 src/core/Akka.Tests/IO/TcpSettingsSpec.cs diff --git a/src/core/Akka.Tests/IO/TcpSettingsSpec.cs b/src/core/Akka.Tests/IO/TcpSettingsSpec.cs new file mode 100644 index 00000000000..b9b55059787 --- /dev/null +++ b/src/core/Akka.Tests/IO/TcpSettingsSpec.cs @@ -0,0 +1,42 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using Akka.Actor; +using Akka.Configuration; +using Akka.IO; +using FluentAssertions; +using Xunit; + +namespace Akka.Tests.IO +{ + public class TcpSettingsSpec + { + [Fact] + public void TcpSettings_should_parse_all_akka_io_tcp_config_values_correctly() + { + // Arrange: load the default reference config + var config = ConfigurationFactory.Default(); + var tcpConfig = config.GetConfig("akka.io.tcp"); + var settings = TcpSettings.Create(tcpConfig); + + // Assert: all values match akka.conf reference + settings.BufferPoolConfigPath.Should().Be("akka.io.tcp.disabled-buffer-pool"); + settings.InitialSocketAsyncEventArgs.Should().Be(32); + settings.TraceLogging.Should().BeFalse(); + settings.BatchAcceptLimit.Should().Be(Environment.ProcessorCount * 2); + settings.RegisterTimeout.Should().Be(TimeSpan.FromSeconds(5)); + settings.ReceivedMessageSizeLimit.Should().Be(int.MaxValue); + settings.ManagementDispatcher.Should().Be("akka.actor.internal-dispatcher"); + settings.FileIODispatcher.Should().Be("akka.actor.default-blocking-io-dispatcher"); + settings.TransferToLimit.Should().Be(524288); // 512 KiB + settings.FinishConnectRetries.Should().Be(5); + settings.OutgoingSocketForceIpv4.Should().BeFalse(); + settings.WriteCommandsQueueMaxSize.Should().Be(-1); + } + } +} \ No newline at end of file