Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/Orleans.Core/Networking/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private async Task ProcessOutgoing()

Exception error = default;
var serializer = this.shared.ServiceProvider.GetRequiredService<MessageSerializer>();
var messageStatisticsSink = this.shared.MessageStatisticsSink;
var messageObserver = this.shared.MessageStatisticsSink.GetMessageObserver();
try
{
var output = this._transport.Output;
Expand All @@ -375,7 +375,7 @@ private async Task ProcessOutgoing()
inflight.Add(message);
var (headerLength, bodyLength) = serializer.Write(output, message);
RecordMessageSend(message, headerLength + bodyLength, headerLength);
messageStatisticsSink.RecordMessage(message);
messageObserver?.Invoke(message);
message = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#nullable enable
using System;
using Orleans.Runtime;

namespace Orleans.Placement.Repartitioning;

internal interface IMessageStatisticsSink
{
void RecordMessage(Message message);
Action<Message>? GetMessageObserver();
}

internal sealed class NoOpMessageStatisticsSink : IMessageStatisticsSink
{
public void RecordMessage(Message message) { }
public Action<Message>? GetMessageObserver() => null;
}
2 changes: 1 addition & 1 deletion src/Orleans.Runtime/Hosting/DefaultSiloServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ internal static void AddDefaultServices(ISiloBuilder builder)
services.TryAddSingleton<MessageCenter>();
services.TryAddFromExisting<IMessageCenter, MessageCenter>();
services.TryAddSingleton(FactoryUtility.Create<MessageCenter, Gateway>);
services.TryAddSingleton<IConnectedClientCollection>(sp => (IConnectedClientCollection)sp.GetRequiredService<MessageCenter>().Gateway ?? new EmptyConnectedClientCollection());
services.TryAddSingleton<IConnectedClientCollection>(sp => sp.GetRequiredService<MessageCenter>().Gateway as IConnectedClientCollection ?? new EmptyConnectedClientCollection());
services.TryAddSingleton<InternalGrainRuntime>();
services.TryAddSingleton<InsideRuntimeClient>();
services.TryAddFromExisting<IRuntimeClient, InsideRuntimeClient>();
Expand Down
55 changes: 28 additions & 27 deletions src/Orleans.Runtime/Messaging/MessageCenter.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics;
Expand All @@ -22,12 +23,12 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable
private readonly SiloMessagingOptions messagingOptions;
private readonly PlacementService placementService;
private readonly GrainLocator _grainLocator;
private readonly IMessageStatisticsSink _messageStatisticsSink;
private readonly Action<Message>? _messageObserver;
private readonly ILogger log;
private readonly Catalog catalog;
private bool stopped;
private HostedClient hostedClient;
private Action<Message> sniffIncomingMessageHandler;
private HostedClient? hostedClient;
private Action<Message>? sniffIncomingMessageHandler;

public MessageCenter(
ILocalSiloDetails siloDetails,
Expand All @@ -50,7 +51,7 @@ public MessageCenter(
this.messagingTrace = messagingTrace;
this.placementService = placementService;
_grainLocator = grainLocator;
_messageStatisticsSink = messageStatisticsSink;
_messageObserver = messageStatisticsSink.GetMessageObserver();
this.log = logger;
this.messageFactory = messageFactory;
this._siloAddress = siloDetails.SiloAddress;
Expand All @@ -61,19 +62,19 @@ public MessageCenter(
}
}

public Gateway Gateway { get; }
public Gateway? Gateway { get; }

internal bool IsBlockingApplicationMessages { get; private set; }

public void SetHostedClient(HostedClient client) => this.hostedClient = client;
public void SetHostedClient(HostedClient? client) => this.hostedClient = client;

public bool TryDeliverToProxy(Message msg)
{
if (!msg.TargetGrain.IsClient()) return false;
if (this.Gateway is Gateway gateway && gateway.TryDeliverToProxy(msg)
|| this.hostedClient is HostedClient client && client.TryDispatchToClient(msg))
{
_messageStatisticsSink.RecordMessage(msg);
_messageObserver?.Invoke(msg);
return true;
}

Expand Down Expand Up @@ -125,7 +126,7 @@ public async Task StopAcceptingClientMessages()
}
}

public Action<Message> SniffIncomingMessage
public Action<Message>? SniffIncomingMessage
{
set
{
Expand Down Expand Up @@ -251,8 +252,8 @@ static async Task SendAsync(MessageCenter messageCenter, ValueTask<Connection> c
public void RejectMessage(
Message message,
Message.RejectionTypes rejectionType,
Exception exc,
string rejectInfo = null)
Exception? exc,
string? rejectInfo = null)
{
if (message.Direction == Message.Directions.Request
|| (message.Direction == Message.Directions.OneWay && message.HasCacheInvalidationHeader))
Expand All @@ -271,20 +272,20 @@ public void RejectMessage(

internal void ProcessRequestsToInvalidActivation(
List<Message> messages,
GrainAddress oldAddress,
SiloAddress forwardingAddress,
string failedOperation = null,
Exception exc = null,
GrainAddress? oldAddress,
SiloAddress? forwardingAddress,
string? failedOperation = null,
Exception? exc = null,
bool rejectMessages = false)
{
if (rejectMessages)
{
GrainAddress validAddress = forwardingAddress switch
GrainAddress? validAddress = forwardingAddress switch
{
null => null,
_ => new()
{
GrainId = oldAddress.GrainId,
GrainId = oldAddress?.GrainId ?? default,
SiloAddress = forwardingAddress,
}
};
Expand All @@ -304,12 +305,12 @@ internal void ProcessRequestsToInvalidActivation(
else
{
this.messagingTrace.OnDispatcherForwardingMultiple(messages.Count, oldAddress, forwardingAddress, failedOperation, exc);
GrainAddress destination = forwardingAddress switch
GrainAddress? destination = forwardingAddress switch
{
null => null,
_ => new()
{
GrainId = oldAddress.GrainId,
GrainId = oldAddress?.GrainId ?? default,
SiloAddress = forwardingAddress,
}
};
Expand All @@ -323,10 +324,10 @@ internal void ProcessRequestsToInvalidActivation(

private void ProcessRequestToInvalidActivation(
Message message,
GrainAddress oldAddress,
SiloAddress forwardingAddress,
GrainAddress? oldAddress,
SiloAddress? forwardingAddress,
string failedOperation,
Exception exc = null,
Exception? exc = null,
bool rejectMessages = false)
{
Debug.Assert(!message.IsLocalOnly);
Expand All @@ -344,20 +345,20 @@ private void ProcessRequestToInvalidActivation(
}
else
{
GrainAddress destination = forwardingAddress switch
GrainAddress? destination = forwardingAddress switch
{
null => null,
_ => new()
{
GrainId = oldAddress.GrainId,
GrainId = oldAddress?.GrainId ?? default,
SiloAddress = forwardingAddress,
}
};
this.TryForwardRequest(message, oldAddress, destination, failedOperation, exc);
}
}

private void TryForwardRequest(Message message, GrainAddress oldAddress, GrainAddress destination, string failedOperation = null, Exception exc = null)
private void TryForwardRequest(Message message, GrainAddress? oldAddress, GrainAddress? destination, string? failedOperation = null, Exception? exc = null)
{
Debug.Assert(!message.IsLocalOnly);

Expand Down Expand Up @@ -415,7 +416,7 @@ internal void RerouteMessage(Message message)
ResendMessageImpl(message);
}

private bool TryForwardMessage(Message message, SiloAddress forwardingAddress)
private bool TryForwardMessage(Message message, SiloAddress? forwardingAddress)
{
if (!MayForward(message, this.messagingOptions)) return false;

Expand All @@ -425,7 +426,7 @@ private bool TryForwardMessage(Message message, SiloAddress forwardingAddress)
return true;
}

private void ResendMessageImpl(Message message, SiloAddress forwardingAddress = null)
private void ResendMessageImpl(Message message, SiloAddress? forwardingAddress = null)
{
if (log.IsEnabled(LogLevel.Debug)) log.LogDebug("Resend {Message}", message);

Expand Down Expand Up @@ -543,7 +544,7 @@ public void ReceiveMessage(Message msg)
}

targetActivation.ReceiveMessage(msg);
_messageStatisticsSink.RecordMessage(msg);
_messageObserver?.Invoke(msg);
}
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#nullable enable
using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -109,7 +110,9 @@ private async Task ProcessPendingEdges(CancellationToken cancellationToken)
}
}

public void RecordMessage(Message message)
public Action<Message>? GetMessageObserver() => RecordMessage;

private void RecordMessage(Message message)
{
if (!_enableMessageSampling || message.IsSystemMessage)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#nullable enable
using Orleans;
using Orleans.Metadata;
using System;
using System.Collections.Concurrent;
Expand Down