Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 101 additions & 51 deletions src/Orleans.Core/Networking/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

namespace Orleans.Runtime.Messaging
{
internal abstract class Connection
internal abstract partial class Connection
{
private static readonly Func<ConnectionContext, Task> OnConnectedDelegate = context => OnConnectedAsync(context);
private static readonly Action<object> OnConnectionClosedDelegate = state => ((Connection)state).OnTransportConnectionClosed();
Expand Down Expand Up @@ -152,13 +152,7 @@ private void StartClosing(Exception exception)
_initializationTcs.TrySetException(exception ?? new ConnectionAbortedException("Connection initialization failed"));
_initializationTcs.Task.Ignore();

if (this.Log.IsEnabled(LogLevel.Information))
{
this.Log.LogInformation(
exception,
"Closing connection {Connection}",
this);
}
LogInformationClosingConnection(this.Log, exception, this);

task.Start(TaskScheduler.Default);
}
Expand Down Expand Up @@ -188,7 +182,7 @@ private async Task CloseAsync()
catch (Exception processIncomingException)
{
// Swallow any exceptions here.
this.Log.LogWarning(processIncomingException, "Exception processing incoming messages on connection {Connection}", this);
LogWarningExceptionProcessingIncomingMessages(this.Log, processIncomingException, this);
}
}

Expand All @@ -201,7 +195,7 @@ private async Task CloseAsync()
catch (Exception processOutgoingException)
{
// Swallow any exceptions here.
this.Log.LogWarning(processOutgoingException, "Exception processing outgoing messages on connection {Connection}", this);
LogWarningExceptionProcessingOutgoingMessages(this.Log, processOutgoingException, this);
}
}

Expand All @@ -215,7 +209,7 @@ private async Task CloseAsync()
}
catch (Exception exception)
{
this.Log.LogWarning(exception, "Exception aborting connection {Connection}", this);
LogWarningExceptionAbortingConnection(this.Log, exception, this);
}

await _transportConnectionClosed.Task;
Expand All @@ -228,7 +222,7 @@ private async Task CloseAsync()
catch (Exception abortException)
{
// Swallow any exceptions here.
this.Log.LogWarning(abortException, "Exception terminating connection {Connection}", this);
LogWarningExceptionTerminatingConnection(this.Log, abortException, this);
}

// Reject in-flight messages.
Expand All @@ -243,23 +237,18 @@ private async Task CloseAsync()
var i = 0;
while (this.outgoingMessages.Reader.TryRead(out var message))
{
if (i == 0 && Log.IsEnabled(LogLevel.Information))
if (i == 0)
{
this.Log.LogInformation(
"Rerouting messages for remote endpoint {EndPoint}",
this.RemoteEndPoint?.ToString() ?? "(never connected)");
LogInformationReroutingMessages(this.Log, new EndPointLogValue(this.RemoteEndPoint));
}

++i;
this.RetryMessage(message);
}

if (i > 0 && this.Log.IsEnabled(LogLevel.Information))
if (i > 0)
{
this.Log.LogInformation(
"Rerouted {Count} messages for remote endpoint {EndPoint}",
i,
this.RemoteEndPoint?.ToString() ?? "(never connected)");
LogInformationReroutedMessages(this.Log, i, new EndPointLogValue(this.RemoteEndPoint));
}
}

Expand Down Expand Up @@ -334,10 +323,7 @@ private async Task ProcessIncoming()
{
if (IsValid)
{
this.Log.LogWarning(
exception,
"Exception while processing messages from remote endpoint {EndPoint}",
this.RemoteEndPoint);
LogWarningExceptionProcessingMessagesFromRemote(this.Log, exception, this.RemoteEndPoint);
}

error = exception;
Expand Down Expand Up @@ -402,10 +388,7 @@ private async Task ProcessOutgoing()
{
if (IsValid)
{
this.Log.LogWarning(
exception,
"Exception while processing messages to remote endpoint {EndPoint}",
this.RemoteEndPoint);
LogWarningExceptionProcessingMessagesToRemote(this.Log, exception, this.RemoteEndPoint);
}

error = exception;
Expand All @@ -419,13 +402,7 @@ private async Task ProcessOutgoing()

private void RerouteMessage(Message message)
{
if (this.Log.IsEnabled(LogLevel.Information))
{
this.Log.LogInformation(
"Rerouting message {Message} from remote endpoint {EndPoint}",
message,
this.RemoteEndPoint?.ToString() ?? "(never connected)");
}
LogInformationReroutingMessage(this.Log, message, new EndPointLogValue(this.RemoteEndPoint));

ThreadPool.UnsafeQueueUserWorkItem(state =>
{
Expand Down Expand Up @@ -453,12 +430,7 @@ private static EndPoint NormalizeEndpoint(EndPoint endpoint)
/// <returns><see langword="true"/> if the exception should not be caught and <see langword="false"/> if it should be caught.</returns>
private bool HandleReceiveMessageFailure(Message message, Exception exception)
{
this.Log.LogError(
exception,
"Exception reading message {Message} from remote endpoint {Remote} to local endpoint {Local}",
message,
this.RemoteEndPoint,
this.LocalEndPoint);
LogErrorExceptionReadingMessage(this.Log, exception, message, this.RemoteEndPoint, this.LocalEndPoint);

// If deserialization completely failed, rethrow the exception so that it can be handled at another level.
if (message is null || exception is InvalidMessageFrameException)
Expand Down Expand Up @@ -500,12 +472,7 @@ private bool HandleSendMessageFailure(Message message, Exception exception)
// We get here if we failed to serialize the msg (or any other catastrophic failure).
// Request msg fails to serialize on the sender, so we just enqueue a rejection msg.
// Response msg fails to serialize on the responding silo, so we try to send an error response back.
this.Log.LogError(
exception,
"Exception sending message {Message} to remote endpoint {Remote} from local endpoint {Local}",
message,
this.RemoteEndPoint,
this.LocalEndPoint);
LogErrorExceptionSendingMessage(this.Log, exception, message, this.RemoteEndPoint, this.LocalEndPoint);

if (message is null || exception is InvalidMessageFrameException)
{
Expand Down Expand Up @@ -535,10 +502,9 @@ private bool HandleSendMessageFailure(Message message, Exception exception)
}
else
{
this.Log.LogWarning(
(int)ErrorCode.Messaging_OutgoingMS_DroppingMessage,
LogWarningDroppingMessage(
this.Log,
exception,
"Dropping message which failed during serialization: {Message}",
message);

MessagingInstruments.OnDroppedSentMessage(message);
Expand Down Expand Up @@ -581,5 +547,89 @@ public void Reset()
this.connection = null;
}
}

private readonly struct EndPointLogValue(EndPoint endPoint)
{
public override string ToString() => endPoint?.ToString() ?? "(never connected)";
}

[LoggerMessage(
Level = LogLevel.Information,
Message = "Closing connection {Connection}"
)]
private static partial void LogInformationClosingConnection(ILogger logger, Exception exception, Connection connection);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Exception processing incoming messages on connection {Connection}"
)]
private static partial void LogWarningExceptionProcessingIncomingMessages(ILogger logger, Exception exception, Connection connection);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Exception processing outgoing messages on connection {Connection}"
)]
private static partial void LogWarningExceptionProcessingOutgoingMessages(ILogger logger, Exception exception, Connection connection);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Exception aborting connection {Connection}"
)]
private static partial void LogWarningExceptionAbortingConnection(ILogger logger, Exception exception, Connection connection);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Exception terminating connection {Connection}"
)]
private static partial void LogWarningExceptionTerminatingConnection(ILogger logger, Exception exception, Connection connection);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Rerouting messages for remote endpoint {EndPoint}"
)]
private static partial void LogInformationReroutingMessages(ILogger logger, EndPointLogValue endPoint);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Rerouted {Count} messages for remote endpoint {EndPoint}"
)]
private static partial void LogInformationReroutedMessages(ILogger logger, int count, EndPointLogValue endPoint);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Exception while processing messages from remote endpoint {EndPoint}"
)]
private static partial void LogWarningExceptionProcessingMessagesFromRemote(ILogger logger, Exception exception, EndPoint endPoint);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Exception while processing messages to remote endpoint {EndPoint}"
)]
private static partial void LogWarningExceptionProcessingMessagesToRemote(ILogger logger, Exception exception, EndPoint endPoint);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Rerouting message {Message} from remote endpoint {EndPoint}"
)]
private static partial void LogInformationReroutingMessage(ILogger logger, Message message, EndPointLogValue endPoint);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Exception reading message {Message} from remote endpoint {Remote} to local endpoint {Local}"
)]
private static partial void LogErrorExceptionReadingMessage(ILogger logger, Exception exception, Message message, EndPoint remote, EndPoint local);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Exception sending message {Message} to remote endpoint {Remote} from local endpoint {Local}"
)]
private static partial void LogErrorExceptionSendingMessage(ILogger logger, Exception exception, Message message, EndPoint remote, EndPoint local);

[LoggerMessage(
EventId = (int)ErrorCode.Messaging_OutgoingMS_DroppingMessage,
Level = LogLevel.Warning,
Message = "Dropping message which failed during serialization: {Message}"
)]
private static partial void LogWarningDroppingMessage(ILogger logger, Exception exception, Message message);
}
}
18 changes: 15 additions & 3 deletions src/Orleans.Core/Networking/Shared/SocketConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Orleans.Networking.Shared
{
internal sealed class SocketConnection : TransportConnection
internal sealed partial class SocketConnection : TransportConnection
{
private static readonly int MinAllocBufferSize = SlabMemoryPool.BlockSize / 2;
private static readonly bool IsWindows = RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
Expand Down Expand Up @@ -99,7 +99,7 @@ private async Task StartAsync()
}
catch (Exception ex)
{
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(StartAsync)}.");
LogErrorUnexpectedExceptionInStartAsync(_trace, ex);
}
}

Expand Down Expand Up @@ -355,7 +355,7 @@ private void CancelConnectionClosedToken()
}
catch (Exception ex)
{
_trace.LogError(0, ex, $"Unexpected exception in {nameof(SocketConnection)}.{nameof(CancelConnectionClosedToken)}.");
LogErrorUnexpectedExceptionInCancelConnectionClosedToken(_trace, ex);
}
}

Expand All @@ -376,5 +376,17 @@ private static bool IsConnectionAbortError(SocketError errorCode)
errorCode == SocketError.Interrupted ||
(errorCode == SocketError.InvalidArgument && !IsWindows);
}

[LoggerMessage(
Level = LogLevel.Error,
Message = "Unexpected exception in SocketConnection.StartAsync."
)]
private static partial void LogErrorUnexpectedExceptionInStartAsync(ILogger logger, Exception exception);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Unexpected exception in SocketConnection.CancelConnectionClosedToken."
)]
private static partial void LogErrorUnexpectedExceptionInCancelConnectionClosedToken(ILogger logger, Exception exception);
}
}
Loading