diff --git a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs index 03e052ff85b..f0616121ebf 100644 --- a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs @@ -502,6 +502,50 @@ public async Task Should_report_Error_only_once_when_connecting_to_unreachable_D replies.Count.ShouldBe(1); } + [Fact] + public async Task Should_report_CommandFailed_when_a_scheduled_connect_retry_throws() + { + // Regression test for https://github.com/akkadotnet/akka.net/issues/8195 + // + // When a connect attempt fails, TcpOutgoingConnection schedules a retry. That retry + // used to run on the scheduler thread, so any exception it threw (e.g. + // PlatformNotSupportedException on Linux when reusing a socket after a failed + // connect) was logged and swallowed by the scheduler - the commander was never + // told and stayed stuck forever. + // + // This exercises the same code path deterministically and cross-platform: an + // IPv4-forced socket is driven down the DNS-fallback retry path and asked to + // retry against an IPv6 address, which throws NotSupportedException everywhere. + // The fix must surface that failure to the commander as Tcp.CommandFailed. + + const string host = "akka-test-8195.invalid"; + + // Pre-seed the DNS cache so the host resolves to BOTH an IPv4 and an IPv6 + // address, forcing TcpOutgoingConnection down its DNS-fallback retry path. + var cache = (SimpleDnsCache)Akka.IO.Dns.Instance.Apply(Sys).Cache; + cache.Put( + new Akka.IO.Dns.Resolved(host, new[] { IPAddress.Loopback }, new[] { IPAddress.IPv6Loopback }), + ttl: (long)TimeSpan.FromMinutes(10).TotalMilliseconds); + + // Grab a free loopback port then release it, so the connection attempt is refused. + int port; + using (var probeSocket = new Socket(SocketType.Stream, ProtocolType.Tcp)) + { + probeSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0)); + port = ((IPEndPoint)probeSocket.LocalEndPoint).Port; + } + + // OutgoingSocketForceIpv4 => the connection actor allocates an IPv4-only socket, + // so the IPv6 fallback retry throws NotSupportedException. + var settings = TcpSettings.Create(Sys) with { OutgoingSocketForceIpv4 = true }; + + var connectCommander = CreateTestProbe(); + connectCommander.Send(Sys.Tcp(), + new Tcp.Connect(new DnsEndPoint(host, port)) { TcpSettings = settings }); + + await connectCommander.ExpectMsgAsync(TimeSpan.FromSeconds(10)); + } + [Fact] public async Task The_TCP_transport_implementation_handle_tcp_connection_actor_death_properly() { diff --git a/src/core/Akka/IO/TcpOutgoingConnection.cs b/src/core/Akka/IO/TcpOutgoingConnection.cs index 6f0f7764ca9..8ad51d83667 100644 --- a/src/core/Akka/IO/TcpOutgoingConnection.cs +++ b/src/core/Akka/IO/TcpOutgoingConnection.cs @@ -20,8 +20,10 @@ namespace Akka.IO /// An actor handling the connection state machine for an outgoing connection /// to be established. /// - internal sealed class TcpOutgoingConnection : TcpConnection + internal sealed class TcpOutgoingConnection : TcpConnection, IWithTimers { + private const string RetryConnectTimerKey = "retry-connect"; + private readonly IActorRef _commander; private readonly Tcp.Connect _connect; @@ -30,6 +32,19 @@ internal sealed class TcpOutgoingConnection : TcpConnection private readonly ConnectException _finishConnectNeverReturnedTrueException = new("Could not establish connection because finishConnect never returned true"); + public ITimerScheduler Timers { get; set; } + + /// + /// Internal trigger used to re-attempt an outgoing connection from inside the + /// actor's own message loop, so connect failures are reported as + /// instead of being swallowed by the scheduler thread. + /// + private sealed class RetryConnect + { + public static readonly RetryConnect Instance = new(); + private RetryConnect() { } + } + public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect) : base( (connect.TcpSettings ?? tcp.Settings), @@ -210,25 +225,15 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs // used only when we've resolved a DNS endpoint. case > 0 when fallbackAddress != null: { - var self = Self; var previousAddress = (IPEndPoint)args.RemoteEndPoint; args.RemoteEndPoint = fallbackAddress; - Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () => - { - if (!Socket.ConnectAsync(args)) - self.Tell(IO.Tcp.SocketConnected.Instance); - }); + ScheduleConnectRetry(); Become(() => Connecting(remainingFinishConnectRetries - 1, args, previousAddress)); break; } case > 0: { - var self = Self; - Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () => - { - if (!Socket.ConnectAsync(args)) - self.Tell(IO.Tcp.SocketConnected.Instance); - }); + ScheduleConnectRetry(); Become(() => Connecting(remainingFinishConnectRetries - 1, args, null)); break; } @@ -239,6 +244,18 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs break; } }); + Receive(_ => + { + // Re-attempt the connection from within the actor's message loop so that any + // exception (e.g. PlatformNotSupportedException on Linux when reusing a socket + // after a failed connect attempt) is caught by ReportConnectFailure and reported + // to the commander as Tcp.CommandFailed instead of being swallowed by the scheduler. + ReportConnectFailure(() => + { + if (!Socket.ConnectAsync(args)) + Self.Tell(IO.Tcp.SocketConnected.Instance); + }); + }); Receive(_ => { if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout @@ -246,6 +263,9 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired")); }); } + + private void ScheduleConnectRetry() + => Timers.StartSingleTimer(RetryConnectTimerKey, RetryConnect.Instance, TimeSpan.FromMilliseconds(1)); } [InternalApi]