Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/core/Akka.Tests/IO/TcpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,50 @@ public async Task Should_report_Error_only_once_when_connecting_to_unreachable_D
replies.Count.ShouldBe(1);
}

[Fact]
public async Task Should_report_CommandFailed_when_a_scheduled_connect_retry_throws()
{
// Regression test for https://github.com/akkadotnet/akka.net/issues/8195
//
// When a connect attempt fails, TcpOutgoingConnection schedules a retry. That retry
// used to run on the scheduler thread, so any exception it threw (e.g.
// PlatformNotSupportedException on Linux when reusing a socket after a failed
// connect) was logged and swallowed by the scheduler - the commander was never
// told and stayed stuck forever.
//
// This exercises the same code path deterministically and cross-platform: an
// IPv4-forced socket is driven down the DNS-fallback retry path and asked to
// retry against an IPv6 address, which throws NotSupportedException everywhere.
// The fix must surface that failure to the commander as Tcp.CommandFailed.

const string host = "akka-test-8195.invalid";

// Pre-seed the DNS cache so the host resolves to BOTH an IPv4 and an IPv6
// address, forcing TcpOutgoingConnection down its DNS-fallback retry path.
var cache = (SimpleDnsCache)Akka.IO.Dns.Instance.Apply(Sys).Cache;
cache.Put(
new Akka.IO.Dns.Resolved(host, new[] { IPAddress.Loopback }, new[] { IPAddress.IPv6Loopback }),
ttl: (long)TimeSpan.FromMinutes(10).TotalMilliseconds);

// Grab a free loopback port then release it, so the connection attempt is refused.
int port;
using (var probeSocket = new Socket(SocketType.Stream, ProtocolType.Tcp))
{
probeSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0));
port = ((IPEndPoint)probeSocket.LocalEndPoint).Port;
}

// OutgoingSocketForceIpv4 => the connection actor allocates an IPv4-only socket,
// so the IPv6 fallback retry throws NotSupportedException.
var settings = TcpSettings.Create(Sys) with { OutgoingSocketForceIpv4 = true };

var connectCommander = CreateTestProbe();
connectCommander.Send(Sys.Tcp(),
new Tcp.Connect(new DnsEndPoint(host, port)) { TcpSettings = settings });

await connectCommander.ExpectMsgAsync<Tcp.CommandFailed>(TimeSpan.FromSeconds(10));
}

[Fact]
public async Task The_TCP_transport_implementation_handle_tcp_connection_actor_death_properly()
{
Expand Down
46 changes: 33 additions & 13 deletions src/core/Akka/IO/TcpOutgoingConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ namespace Akka.IO
/// An actor handling the connection state machine for an outgoing connection
/// to be established.
/// </summary>
internal sealed class TcpOutgoingConnection : TcpConnection
internal sealed class TcpOutgoingConnection : TcpConnection, IWithTimers
{
private const string RetryConnectTimerKey = "retry-connect";

private readonly IActorRef _commander;
private readonly Tcp.Connect _connect;

Expand All @@ -30,6 +32,19 @@ internal sealed class TcpOutgoingConnection : TcpConnection
private readonly ConnectException _finishConnectNeverReturnedTrueException =
new("Could not establish connection because finishConnect never returned true");

public ITimerScheduler Timers { get; set; }

/// <summary>
/// Internal trigger used to re-attempt an outgoing connection from inside the
/// actor's own message loop, so connect failures are reported as <see cref="Tcp.CommandFailed"/>
/// instead of being swallowed by the scheduler thread.
/// </summary>
private sealed class RetryConnect
{
public static readonly RetryConnect Instance = new();
private RetryConnect() { }
}

public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect)
: base(
(connect.TcpSettings ?? tcp.Settings),
Expand Down Expand Up @@ -210,25 +225,15 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs
// used only when we've resolved a DNS endpoint.
case > 0 when fallbackAddress != null:
{
var self = Self;
var previousAddress = (IPEndPoint)args.RemoteEndPoint;
args.RemoteEndPoint = fallbackAddress;
Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () =>

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah so this was the real bug - having a lambda running outside the actor attempting the socket reconnect. We probably should have never designed it that way. Refactoring it to use an in-actor scheduled message instead.

{
if (!Socket.ConnectAsync(args))
self.Tell(IO.Tcp.SocketConnected.Instance);
});
ScheduleConnectRetry();
Become(() => Connecting(remainingFinishConnectRetries - 1, args, previousAddress));
break;
}
case > 0:
{
var self = Self;
Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () =>
{
if (!Socket.ConnectAsync(args))
self.Tell(IO.Tcp.SocketConnected.Instance);
});
ScheduleConnectRetry();
Become(() => Connecting(remainingFinishConnectRetries - 1, args, null));
break;
}
Expand All @@ -239,13 +244,28 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs
break;
}
});
Receive<RetryConnect>(_ =>
{
// Re-attempt the connection from within the actor's message loop so that any
// exception (e.g. PlatformNotSupportedException on Linux when reusing a socket
// after a failed connect attempt) is caught by ReportConnectFailure and reported
// to the commander as Tcp.CommandFailed instead of being swallowed by the scheduler.
ReportConnectFailure(() =>
{
if (!Socket.ConnectAsync(args))

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhat confusing: does not return a Task. Returns true if the connect operation is pending (and we'll get a SocketAsyncEventArgs event to that effect later when it comples,) returns false if the operation already completed synchronously, which is why we send a SocketConnected message to ourselves immediately.

Self.Tell(IO.Tcp.SocketConnected.Instance);
});
});
Receive<ReceiveTimeout>(_ =>
{
if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null); // Clear the timeout
Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired"));
});
}

private void ScheduleConnectRetry()
=> Timers.StartSingleTimer(RetryConnectTimerKey, RetryConnect.Instance, TimeSpan.FromMilliseconds(1));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

preserves the same timing mechanics as before, just via messaging rather than delegates.

}

[InternalApi]
Expand Down
Loading