Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/core/Akka.Tests/IO/TcpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,40 @@ public async Task Should_report_Error_only_once_when_connecting_to_unreachable_D
reply.Should().NotBeNull();
}

[Fact]
public async Task Should_report_CommandFailed_when_outgoing_connection_is_refused()
{
// Regression guard for https://github.com/akkadotnet/akka.net/issues/8195
//
// When a connect attempt fails, TcpOutgoingConnection schedules a retry. The retry
// used to run on the scheduler thread, so any exception it threw (e.g.
// PlatformNotSupportedException on Linux when reusing a socket after a failed
// connect) was logged and swallowed by the scheduler - the commander was never
// told and stayed stuck forever. The retry now runs inside the actor's message
// loop, so any such failure is surfaced to the commander as Tcp.CommandFailed.
//
// The commander must always end up receiving CommandFailed rather than hanging.
var connectCommander = CreateTestProbe();

// Bind a socket to grab a free loopback port, then release it so the
// connection attempt is refused.
int port;
using (var probeSocket = new Socket(SocketType.Stream, ProtocolType.Tcp))
{
probeSocket.Bind(new IPEndPoint(IPAddress.Loopback, 0));
port = ((IPEndPoint)probeSocket.LocalEndPoint).Port;
}

// A connect timeout makes the outcome deterministic across platforms: on Linux the
// refused connection fails fast and drives the retry path, while on platforms where
// connecting to an unused port does not fail fast (e.g. Windows), the connect-timeout
// ReceiveTimeout still guarantees CommandFailed is delivered.
connectCommander.Send(Sys.Tcp(),
new Tcp.Connect(new IPEndPoint(IPAddress.Loopback, port), timeout: TimeSpan.FromSeconds(3)));

await connectCommander.ExpectMsgAsync<Tcp.CommandFailed>(TimeSpan.FromSeconds(20));
}

[Fact]
public async Task The_TCP_transport_implementation_handle_tcp_connection_actor_death_properly()
{
Expand Down
39 changes: 32 additions & 7 deletions src/core/Akka/IO/TcpOutgoingConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ namespace Akka.IO
/// An actor handling the connection state machine for an outgoing connection
/// to be established.
/// </summary>
internal sealed class TcpOutgoingConnection : TcpConnection
internal sealed class TcpOutgoingConnection : TcpConnection, IWithTimers
{
private const string RetryConnectTimerKey = "retry-connect";

private readonly IActorRef _commander;
private readonly Tcp.Connect _connect;

Expand All @@ -33,6 +35,19 @@ internal sealed class TcpOutgoingConnection : TcpConnection
private readonly ConnectException _finishConnectNeverReturnedTrueException =
new("Could not establish connection because finishConnect never returned true");

public ITimerScheduler Timers { get; set; } = null!;

/// <summary>
/// Internal trigger used to re-attempt an outgoing connection from inside the
/// actor's own message loop, so connect failures are reported as <see cref="Tcp.CommandFailed"/>
/// instead of being swallowed by the scheduler thread.
/// </summary>
private sealed class RetryConnect
{
public static readonly RetryConnect Instance = new();
private RetryConnect() { }
}

public TcpOutgoingConnection(TcpExt tcp, IActorRef commander, Tcp.Connect connect)
: base(
(connect.TcpSettings ?? tcp.Settings),
Expand Down Expand Up @@ -204,12 +219,7 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs
{
case > 0:
{
var self = Self;
Context.System.Scheduler.Advanced.ScheduleOnce(TimeSpan.FromMilliseconds(1), () =>
{
if (!Socket.ConnectAsync(args))
self.Tell(IO.Tcp.SocketConnected.Instance);
});
ScheduleConnectRetry();
Become(() => Connecting(remainingFinishConnectRetries - 1, args));
break;
}
Expand All @@ -220,13 +230,28 @@ private void Connecting(int remainingFinishConnectRetries, SocketAsyncEventArgs
break;
}
});
Receive<RetryConnect>(_ =>
{
// Re-attempt the connection from within the actor's message loop so that any
// exception (e.g. PlatformNotSupportedException on Linux when reusing a socket
// after a failed connect attempt) is caught by ReportConnectFailure and reported
// to the commander as Tcp.CommandFailed instead of being swallowed by the scheduler.
ReportConnectFailure(() =>
{
if (!Socket.ConnectAsync(args))
Self.Tell(IO.Tcp.SocketConnected.Instance);
});
});
Receive<ReceiveTimeout>(_ =>
{
if (_connect.Timeout.HasValue) Context.SetReceiveTimeout(null);
Log.Debug("Connect timeout expired, could not establish connection to [{0}]", _connect.RemoteAddress);
Stop(new ConnectException($"Connect timeout of {_connect.Timeout} expired"));
});
}

private void ScheduleConnectRetry()
=> Timers.StartSingleTimer(RetryConnectTimerKey, RetryConnect.Instance, TimeSpan.FromMilliseconds(1));
}

[InternalApi]
Expand Down
Loading