diff --git a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs index 9cdf33f8b2e..8df691bcff0 100644 --- a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs @@ -518,6 +518,40 @@ public async Task Should_report_Error_only_once_when_connecting_to_unreachable_D reply.Should().NotBeNull(); } + [Fact] + public async Task Should_report_CommandFailed_when_outgoing_connection_is_refused() + { + // Regression guard for https://github.com/akkadotnet/akka.net/issues/8195 + // + // When a connect attempt fails, TcpOutgoingConnection schedules a retry. The retry + // used to run on the scheduler thread, so any exception it threw (e.g. + // PlatformNotSupportedException on Linux when reusing a socket after a failed + // connect) was logged and swallowed by the scheduler - the commander was never + // told and stayed stuck forever. The retry now runs inside the actor's message + // loop, so any such failure is surfaced to the commander as Tcp.CommandFailed. + // + // The commander must always end up receiving CommandFailed rather than hanging. + var connectCommander = CreateTestProbe(); + + // Bind a socket to grab a free loopback port, then release it so the + // connection attempt is refused. + int port; + using (var probeSocket = new Socket(SocketType.Stream, ProtocolType.Tcp)) + { + probeSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + port = ((IPEndPoint)probeSocket.LocalEndPoint).Port; + } + + // A connect timeout makes the outcome deterministic across platforms: on Linux the + // refused connection fails fast and drives the retry path, while on platforms where + // connecting to an unused port does not fail fast (e.g. Windows), the connect-timeout + // ReceiveTimeout still guarantees CommandFailed is delivered. + connectCommander.Send(Sys.Tcp(), + new Tcp.Connect(new IPEndPoint(IPAddress.Loopback, port), timeout: TimeSpan.FromSeconds(3))); + + await connectCommander.ExpectMsgAsync(TimeSpan.FromSeconds(20)); + } + [Fact] public async Task The_TCP_transport_implementation_handle_tcp_connection_actor_death_properly() { diff --git a/src/core/Akka/IO/TcpOutgoingConnection.cs b/src/core/Akka/IO/TcpOutgoingConnection.cs index 07021fe85d1..538f07a6baa 100644 --- a/src/core/Akka/IO/TcpOutgoingConnection.cs +++ b/src/core/Akka/IO/TcpOutgoingConnection.cs @@ -23,8 +23,10 @@ namespace Akka.IO /// An actor handling the connection state machine for an outgoing connection /// to be established. /// - internal sealed class TcpOutgoingConnection : TcpConnection + internal sealed class TcpOutgoingConnection : TcpConnection, IWithTimers { + private const string RetryConnectTimerKey = "retry-connect"; + private readonly IActorRef _commander; private readonly Tcp.Connect _connect; @@ -33,6 +35,19 @@ internal sealed class TcpOutgoingConnection : TcpConnection private readonly ConnectException _finishConnectNeverReturnedTrueException = new("Could not establish connection because finishConnect never returned true"); + public ITimerScheduler Timers { get; set; } = null!; + + /// + /// Internal trigger used to re-attempt an outgoing connection from inside the + /// actor's own message loop, so connect failures are reported as + /// instead of being swallowed by the scheduler thread. + /// + private sealed class RetryConnect + { + public static readonly RetryConnect Instance = new(); + private RetryConnect() { } + } + public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect) : base( (connect.TcpSettings ?? tcp.Settings), @@ -204,12 +219,7 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs { case > 0: { - var self = Self; - Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () => - { - if (!Socket.ConnectAsync(args)) - self.Tell(IO.Tcp.SocketConnected.Instance); - }); + ScheduleConnectRetry(); Become(() => Connecting(remainingFinishConnectRetries - 1, args)); break; } @@ -220,6 +230,18 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs break; } }); + Receive(_ => + { + // Re-attempt the connection from within the actor's message loop so that any + // exception (e.g. PlatformNotSupportedException on Linux when reusing a socket + // after a failed connect attempt) is caught by ReportConnectFailure and reported + // to the commander as Tcp.CommandFailed instead of being swallowed by the scheduler. + ReportConnectFailure(() => + { + if (!Socket.ConnectAsync(args)) + Self.Tell(IO.Tcp.SocketConnected.Instance); + }); + }); Receive(_ => { if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); @@ -227,6 +249,9 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired")); }); } + + private void ScheduleConnectRetry() + => Timers.StartSingleTimer(RetryConnectTimerKey, RetryConnect.Instance, TimeSpan.FromMilliseconds(1)); } [InternalApi]