Skip to content

Commit a85e337

Browse files
authored
Activation repartitioner: use null for no-op message observer to avoid interface call (#9056)
1 parent a44d4a3 commit a85e337

File tree

6 files changed

+39
-34
lines changed

6 files changed

+39
-34
lines changed

src/Orleans.Core/Networking/Connection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ private async Task ProcessOutgoing()
353353

354354
Exception error = default;
355355
var serializer = this.shared.ServiceProvider.GetRequiredService<MessageSerializer>();
356-
var messageStatisticsSink = this.shared.MessageStatisticsSink;
356+
var messageObserver = this.shared.MessageStatisticsSink.GetMessageObserver();
357357
try
358358
{
359359
var output = this._transport.Output;
@@ -375,7 +375,7 @@ private async Task ProcessOutgoing()
375375
inflight.Add(message);
376376
var (headerLength, bodyLength) = serializer.Write(output, message);
377377
RecordMessageSend(message, headerLength + bodyLength, headerLength);
378-
messageStatisticsSink.RecordMessage(message);
378+
messageObserver?.Invoke(message);
379379
message = null;
380380
}
381381
}
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
1+
#nullable enable
2+
using System;
13
using Orleans.Runtime;
24

35
namespace Orleans.Placement.Repartitioning;
46

57
internal interface IMessageStatisticsSink
68
{
7-
void RecordMessage(Message message);
9+
Action<Message>? GetMessageObserver();
810
}
911

1012
internal sealed class NoOpMessageStatisticsSink : IMessageStatisticsSink
1113
{
12-
public void RecordMessage(Message message) { }
14+
public Action<Message>? GetMessageObserver() => null;
1315
}

src/Orleans.Runtime/Hosting/DefaultSiloServices.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ internal static void AddDefaultServices(ISiloBuilder builder)
139139
services.TryAddSingleton<MessageCenter>();
140140
services.TryAddFromExisting<IMessageCenter, MessageCenter>();
141141
services.TryAddSingleton(FactoryUtility.Create<MessageCenter, Gateway>);
142-
services.TryAddSingleton<IConnectedClientCollection>(sp => (IConnectedClientCollection)sp.GetRequiredService<MessageCenter>().Gateway ?? new EmptyConnectedClientCollection());
142+
services.TryAddSingleton<IConnectedClientCollection>(sp => sp.GetRequiredService<MessageCenter>().Gateway as IConnectedClientCollection ?? new EmptyConnectedClientCollection());
143143
services.TryAddSingleton<InternalGrainRuntime>();
144144
services.TryAddSingleton<InsideRuntimeClient>();
145145
services.TryAddFromExisting<IRuntimeClient, InsideRuntimeClient>();

src/Orleans.Runtime/Messaging/MessageCenter.cs

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#nullable enable
12
using System;
23
using System.Collections.Generic;
34
using System.Diagnostics;
@@ -22,12 +23,12 @@ internal class MessageCenter : IMessageCenter, IAsyncDisposable
2223
private readonly SiloMessagingOptions messagingOptions;
2324
private readonly PlacementService placementService;
2425
private readonly GrainLocator _grainLocator;
25-
private readonly IMessageStatisticsSink _messageStatisticsSink;
26+
private readonly Action<Message>? _messageObserver;
2627
private readonly ILogger log;
2728
private readonly Catalog catalog;
2829
private bool stopped;
29-
private HostedClient hostedClient;
30-
private Action<Message> sniffIncomingMessageHandler;
30+
private HostedClient? hostedClient;
31+
private Action<Message>? sniffIncomingMessageHandler;
3132

3233
public MessageCenter(
3334
ILocalSiloDetails siloDetails,
@@ -50,7 +51,7 @@ public MessageCenter(
5051
this.messagingTrace = messagingTrace;
5152
this.placementService = placementService;
5253
_grainLocator = grainLocator;
53-
_messageStatisticsSink = messageStatisticsSink;
54+
_messageObserver = messageStatisticsSink.GetMessageObserver();
5455
this.log = logger;
5556
this.messageFactory = messageFactory;
5657
this._siloAddress = siloDetails.SiloAddress;
@@ -61,19 +62,19 @@ public MessageCenter(
6162
}
6263
}
6364

64-
public Gateway Gateway { get; }
65+
public Gateway? Gateway { get; }
6566

6667
internal bool IsBlockingApplicationMessages { get; private set; }
6768

68-
public void SetHostedClient(HostedClient client) => this.hostedClient = client;
69+
public void SetHostedClient(HostedClient? client) => this.hostedClient = client;
6970

7071
public bool TryDeliverToProxy(Message msg)
7172
{
7273
if (!msg.TargetGrain.IsClient()) return false;
7374
if (this.Gateway is Gateway gateway && gateway.TryDeliverToProxy(msg)
7475
|| this.hostedClient is HostedClient client && client.TryDispatchToClient(msg))
7576
{
76-
_messageStatisticsSink.RecordMessage(msg);
77+
_messageObserver?.Invoke(msg);
7778
return true;
7879
}
7980

@@ -125,7 +126,7 @@ public async Task StopAcceptingClientMessages()
125126
}
126127
}
127128

128-
public Action<Message> SniffIncomingMessage
129+
public Action<Message>? SniffIncomingMessage
129130
{
130131
set
131132
{
@@ -251,8 +252,8 @@ static async Task SendAsync(MessageCenter messageCenter, ValueTask<Connection> c
251252
public void RejectMessage(
252253
Message message,
253254
Message.RejectionTypes rejectionType,
254-
Exception exc,
255-
string rejectInfo = null)
255+
Exception? exc,
256+
string? rejectInfo = null)
256257
{
257258
if (message.Direction == Message.Directions.Request
258259
|| (message.Direction == Message.Directions.OneWay && message.HasCacheInvalidationHeader))
@@ -271,20 +272,20 @@ public void RejectMessage(
271272

272273
internal void ProcessRequestsToInvalidActivation(
273274
List<Message> messages,
274-
GrainAddress oldAddress,
275-
SiloAddress forwardingAddress,
276-
string failedOperation = null,
277-
Exception exc = null,
275+
GrainAddress? oldAddress,
276+
SiloAddress? forwardingAddress,
277+
string? failedOperation = null,
278+
Exception? exc = null,
278279
bool rejectMessages = false)
279280
{
280281
if (rejectMessages)
281282
{
282-
GrainAddress validAddress = forwardingAddress switch
283+
GrainAddress? validAddress = forwardingAddress switch
283284
{
284285
null => null,
285286
_ => new()
286287
{
287-
GrainId = oldAddress.GrainId,
288+
GrainId = oldAddress?.GrainId ?? default,
288289
SiloAddress = forwardingAddress,
289290
}
290291
};
@@ -304,12 +305,12 @@ internal void ProcessRequestsToInvalidActivation(
304305
else
305306
{
306307
this.messagingTrace.OnDispatcherForwardingMultiple(messages.Count, oldAddress, forwardingAddress, failedOperation, exc);
307-
GrainAddress destination = forwardingAddress switch
308+
GrainAddress? destination = forwardingAddress switch
308309
{
309310
null => null,
310311
_ => new()
311312
{
312-
GrainId = oldAddress.GrainId,
313+
GrainId = oldAddress?.GrainId ?? default,
313314
SiloAddress = forwardingAddress,
314315
}
315316
};
@@ -323,10 +324,10 @@ internal void ProcessRequestsToInvalidActivation(
323324

324325
private void ProcessRequestToInvalidActivation(
325326
Message message,
326-
GrainAddress oldAddress,
327-
SiloAddress forwardingAddress,
327+
GrainAddress? oldAddress,
328+
SiloAddress? forwardingAddress,
328329
string failedOperation,
329-
Exception exc = null,
330+
Exception? exc = null,
330331
bool rejectMessages = false)
331332
{
332333
Debug.Assert(!message.IsLocalOnly);
@@ -344,20 +345,20 @@ private void ProcessRequestToInvalidActivation(
344345
}
345346
else
346347
{
347-
GrainAddress destination = forwardingAddress switch
348+
GrainAddress? destination = forwardingAddress switch
348349
{
349350
null => null,
350351
_ => new()
351352
{
352-
GrainId = oldAddress.GrainId,
353+
GrainId = oldAddress?.GrainId ?? default,
353354
SiloAddress = forwardingAddress,
354355
}
355356
};
356357
this.TryForwardRequest(message, oldAddress, destination, failedOperation, exc);
357358
}
358359
}
359360

360-
private void TryForwardRequest(Message message, GrainAddress oldAddress, GrainAddress destination, string failedOperation = null, Exception exc = null)
361+
private void TryForwardRequest(Message message, GrainAddress? oldAddress, GrainAddress? destination, string? failedOperation = null, Exception? exc = null)
361362
{
362363
Debug.Assert(!message.IsLocalOnly);
363364

@@ -415,7 +416,7 @@ internal void RerouteMessage(Message message)
415416
ResendMessageImpl(message);
416417
}
417418

418-
private bool TryForwardMessage(Message message, SiloAddress forwardingAddress)
419+
private bool TryForwardMessage(Message message, SiloAddress? forwardingAddress)
419420
{
420421
if (!MayForward(message, this.messagingOptions)) return false;
421422

@@ -425,7 +426,7 @@ private bool TryForwardMessage(Message message, SiloAddress forwardingAddress)
425426
return true;
426427
}
427428

428-
private void ResendMessageImpl(Message message, SiloAddress forwardingAddress = null)
429+
private void ResendMessageImpl(Message message, SiloAddress? forwardingAddress = null)
429430
{
430431
if (log.IsEnabled(LogLevel.Debug)) log.LogDebug("Resend {Message}", message);
431432

@@ -543,7 +544,7 @@ public void ReceiveMessage(Message msg)
543544
}
544545

545546
targetActivation.ReceiveMessage(msg);
546-
_messageStatisticsSink.RecordMessage(msg);
547+
_messageObserver?.Invoke(msg);
547548
}
548549
}
549550
catch (Exception ex)

src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#nullable enable
22
using System;
33
using System.Diagnostics;
4+
using System.Runtime.CompilerServices;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using Microsoft.Extensions.Logging;
@@ -109,7 +110,9 @@ private async Task ProcessPendingEdges(CancellationToken cancellationToken)
109110
}
110111
}
111112

112-
public void RecordMessage(Message message)
113+
public Action<Message>? GetMessageObserver() => RecordMessage;
114+
115+
private void RecordMessage(Message message)
113116
{
114117
if (!_enableMessageSampling || message.IsSystemMessage)
115118
{

src/Orleans.Runtime/Placement/Repartitioning/RepartitionerMessageFilter.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
#nullable enable
2-
using Orleans;
32
using Orleans.Metadata;
43
using System;
54
using System.Collections.Concurrent;

0 commit comments

Comments
 (0)