Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/Orleans.Runtime/Activation/ActivationDataActivatorProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace Orleans.Runtime
{
internal class ActivationDataActivatorProvider : IGrainContextActivatorProvider
internal partial class ActivationDataActivatorProvider : IGrainContextActivatorProvider
{
private readonly IServiceProvider _serviceProvider;
private readonly IActivationWorkingSet _activationWorkingSet;
Expand Down Expand Up @@ -81,7 +81,7 @@ public bool TryGet(GrainType grainType, out IGrainContextActivator activator)
return true;
}

private class ActivationDataActivator : IGrainContextActivator
private partial class ActivationDataActivator : IGrainContextActivator
{
private readonly ILogger<WorkItemGroup> _workItemGroupLogger;
private readonly ILogger<ActivationTaskScheduler> _activationTaskSchedulerLogger;
Expand Down Expand Up @@ -133,7 +133,7 @@ public IGrainContext CreateContext(GrainAddress activationAddress)
}
catch (Exception exception)
{
_grainLogger.LogError(exception, "Failed to construct grain '{GrainId}'.", activationAddress.GrainId);
LogErrorFailedToConstructGrain(_grainLogger, exception, activationAddress.GrainId);
throw;
}
finally
Expand All @@ -143,6 +143,12 @@ public IGrainContext CreateContext(GrainAddress activationAddress)

return context;
}

[LoggerMessage(
Level = LogLevel.Error,
Message = "Failed to construct grain '{GrainId}'."
)]
private static partial void LogErrorFailedToConstructGrain(ILogger logger, Exception exception, GrainId grainId);
}
}

Expand All @@ -159,4 +165,4 @@ public StatelessWorkerActivator(GrainTypeSharedContext sharedContext, IGrainCont

public IGrainContext CreateContext(GrainAddress address) => new StatelessWorkerGrainContext(address, _sharedContext, _innerActivator);
}
}
}
93 changes: 64 additions & 29 deletions src/Orleans.Runtime/Cancellation/GrainCallCancellationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ internal struct GrainCallCancellationRequest(GrainId targetGrainId, GrainId sour
/// <summary>
/// Cancels grain calls issued to remote hosts and handles cancellation requests from other hosts.
/// </summary>
internal class GrainCallCancellationManager : SystemTarget, IGrainCallCancellationManagerSystemTarget, IGrainCallCancellationManager, ILifecycleParticipant<ISiloLifecycle>
internal partial class GrainCallCancellationManager : SystemTarget, IGrainCallCancellationManagerSystemTarget, IGrainCallCancellationManager, ILifecycleParticipant<ISiloLifecycle>
{
private const int MaxBatchSize = 1_000;
private readonly ConcurrentDictionary<SiloAddress, (Task PumpTask, Channel<GrainCallCancellationRequest> WorkItemChannel, CancellationTokenSource Cts)> _workers = new();
Expand Down Expand Up @@ -123,10 +123,7 @@ private async Task ProcessMembershipUpdates()

try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Monitoring cluster membership updates");
}
LogDebugMonitoringClusterMembershipUpdates(_logger);

var previousSnapshot = _clusterMembershipService.CurrentSnapshot;
await foreach (var snapshot in _clusterMembershipService.MembershipUpdates.WithCancellation(_shuttingDownCts.Token))
Expand All @@ -153,16 +150,13 @@ private async Task ProcessMembershipUpdates()
}
catch (Exception exception)
{
_logger.LogError(exception, "Error processing cluster membership updates");
LogErrorProcessingClusterMembershipUpdates(_logger, exception);
}
}
}
finally
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("No longer monitoring cluster membership updates");
}
LogDebugNoLongerMonitoringClusterMembershipUpdates(_logger);
}
}

Expand All @@ -173,10 +167,7 @@ private async Task PumpCancellationQueue(SiloAddress targetSilo, Channel<GrainCa
var remote = GrainFactory.GetSystemTarget<IGrainCallCancellationManagerSystemTarget>(Constants.CancellationManagerType, targetSilo);
await Task.Yield();

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Starting cancellation worker for target silo {SiloAddress}", targetSilo);
}
LogDebugStartingCancellationWorker(_logger, targetSilo);

var batch = new List<GrainCallCancellationRequest>();
var reader = workItems.Reader;
Expand All @@ -193,10 +184,7 @@ private async Task PumpCancellationQueue(SiloAddress targetSilo, Channel<GrainCa
// Attempt to cancel the batch.
await remote.CancelCallsAsync(batch).AsTask().WaitAsync(cancellationToken);

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Cancelled {Count} requests to target silo {SiloAddress}", batch.Count, targetSilo);
}
LogDebugCancelledRequests(_logger, batch.Count, targetSilo);

batch.Clear();
}
Expand All @@ -207,7 +195,7 @@ private async Task PumpCancellationQueue(SiloAddress targetSilo, Channel<GrainCa
break;
}

_logger.LogError(exception, "Error while cancelling {Count} requests to {SiloAddress}", batch.Count, targetSilo);
LogErrorCancellingRequests(_logger, exception, batch.Count, targetSilo);
await Task.Delay(5_000, cancellationToken);
}
}
Expand All @@ -223,10 +211,7 @@ private async Task PumpCancellationQueue(SiloAddress targetSilo, Channel<GrainCa
{
// Remove ourselves and clean up.
RemoveWorker(targetSilo);
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Exiting cancellation worker for target silo {SiloAddress}", targetSilo);
}
LogDebugExitingCancellationWorker(_logger, targetSilo);
}
}

Expand Down Expand Up @@ -262,11 +247,7 @@ private void RemoveWorker(SiloAddress targetSilo)
{
if (_workers.TryRemove(targetSilo, out var entry))
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("Target silo '{SiloAddress}' is no longer active, so this cancellation activation worker is terminating", targetSilo);
}

LogDebugTargetSiloNoLongerActive(_logger, targetSilo);
entry.Cts.Dispose();
}
}
Expand Down Expand Up @@ -299,7 +280,7 @@ private async Task StopAsync(CancellationToken cancellationToken)
}
catch (Exception exception)
{
_logger.LogWarning(exception, "Error signaling shutdown.");
LogWarningErrorSignalingShutdown(_logger, exception);
}

await Task.WhenAll(tasks).WaitAsync(cancellationToken).SuppressThrowing();
Expand All @@ -314,4 +295,58 @@ void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
ct => this.RunOrQueueTask(() => StartAsync(ct)),
ct => this.RunOrQueueTask(() => StopAsync(ct)));
}

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Monitoring cluster membership updates"
)]
private static partial void LogDebugMonitoringClusterMembershipUpdates(ILogger logger);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Starting cancellation worker for target silo {SiloAddress}"
)]
private static partial void LogDebugStartingCancellationWorker(ILogger logger, SiloAddress siloAddress);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Cancelled {Count} requests to target silo {SiloAddress}"
)]
private static partial void LogDebugCancelledRequests(ILogger logger, int count, SiloAddress siloAddress);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Error while cancelling {Count} requests to {SiloAddress}"
)]
private static partial void LogErrorCancellingRequests(ILogger logger, Exception exception, int count, SiloAddress siloAddress);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Exiting cancellation worker for target silo {SiloAddress}"
)]
private static partial void LogDebugExitingCancellationWorker(ILogger logger, SiloAddress siloAddress);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Target silo '{SiloAddress}' is no longer active, so this cancellation activation worker is terminating"
)]
private static partial void LogDebugTargetSiloNoLongerActive(ILogger logger, SiloAddress siloAddress);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Error signaling shutdown."
)]
private static partial void LogWarningErrorSignalingShutdown(ILogger logger, Exception exception);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "No longer monitoring cluster membership updates"
)]
private static partial void LogDebugNoLongerMonitoringClusterMembershipUpdates(ILogger logger);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Error processing cluster membership updates"
)]
private static partial void LogErrorProcessingClusterMembershipUpdates(ILogger logger, Exception exception);
}
8 changes: 7 additions & 1 deletion src/Orleans.Runtime/Catalog/ActivationData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1973,11 +1973,17 @@ void CancelRequest(IInvokable request)
}
catch (Exception exception)
{
Shared.Logger.LogWarning(exception, "One or more cancellation callbacks failed.");
LogErrorCancellationCallbackFailed(Shared.Logger, exception);
}
}
}

[LoggerMessage(
Level = LogLevel.Warning,
Message = "One or more cancellation callbacks failed."
)]
private static partial void LogErrorCancellationCallbackFailed(ILogger logger, Exception exception);

#endregion

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public async ValueTask<Immutable<List<GrainAddress>>> GetRegisteredActivations(M
var stopwatch = CoarseStopwatch.StartNew();
using var cts = new CancellationTokenSource();
cts.Cancel();

foreach (var (grainId, activation) in _localActivations)
{
var directory = GetGrainDirectory(activation, _grainDirectoryResolver!);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

internal class SiloMetadataCache(
internal partial class SiloMetadataCache(
ISiloMetadataClient siloMetadataClient,
MembershipTableManager membershipTableManager,
IOptions<ClusterMembershipOptions> clusterMembershipOptions,
Expand All @@ -29,7 +29,7 @@ void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
Task OnStart(CancellationToken _)
{
// This gives time for the cluster to be voted Dead and for membership updates to propagate that out
negativeCachePeriod = clusterMembershipOptions.Value.ProbeTimeout * clusterMembershipOptions.Value.NumMissedProbesLimit
negativeCachePeriod = clusterMembershipOptions.Value.ProbeTimeout * clusterMembershipOptions.Value.NumMissedProbesLimit
+ (2 * clusterMembershipOptions.Value.TableRefreshTimeout);
task = Task.Run(() => this.ProcessMembershipUpdates(_cts.Token));
return Task.CompletedTask;
Expand All @@ -55,7 +55,7 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
{
try
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Starting to process membership updates.");
LogDebugStartProcessingMembershipUpdates(logger);
await foreach (var update in membershipTableManager.MembershipTableUpdates.WithCancellation(ct))
{
// Add entries for members that aren't already in the cache
Expand All @@ -80,7 +80,7 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
catch(Exception exception)
{
_negativeCache.TryAdd(membershipEntry.Key, now + negativeCachePeriod);
logger.LogError(exception, "Error fetching metadata for silo {Silo}", membershipEntry.Key);
LogErrorFetchingSiloMetadata(logger, exception, membershipEntry.Key);
}
}
}
Expand Down Expand Up @@ -111,11 +111,11 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
}
catch (Exception exception)
{
logger.LogError(exception, "Error processing membership updates");
LogErrorProcessingMembershipUpdates(logger, exception);
}
finally
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Stopping membership update processor");
LogDebugStoppingMembershipProcessor(logger);
}
}

Expand All @@ -124,5 +124,25 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
public void SetMetadata(SiloAddress siloAddress, SiloMetadata metadata) => _metadata.TryAdd(siloAddress, metadata);

public void Dispose() => _cts.Cancel();

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Starting to process membership updates.")]
private static partial void LogDebugStartProcessingMembershipUpdates(ILogger logger);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Error fetching metadata for silo {Silo}")]
private static partial void LogErrorFetchingSiloMetadata(ILogger logger, Exception exception, SiloAddress silo);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Error processing membership updates")]
private static partial void LogErrorProcessingMembershipUpdates(ILogger logger, Exception exception);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Stopping membership update processor")]
private static partial void LogDebugStoppingMembershipProcessor(ILogger logger);
}

20 changes: 14 additions & 6 deletions src/Orleans.Runtime/Messaging/Gateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,7 @@ public void Send(Message msg)
{
_pendingToSend.Enqueue(msg);
_signal.Signal();
#if DEBUG
if (_gateway.logger.IsEnabled(LogLevel.Trace)) _gateway.logger.LogTrace("Queued message {Message} for client {TargetGrain}", msg, msg.TargetGrain);
#endif
LogTraceQueuedMessage(_gateway.logger, msg, msg.TargetGrain);
}

private async Task RunMessageLoop()
Expand Down Expand Up @@ -387,9 +385,7 @@ private async Task RunMessageLoop()
{
if (TrySend(connection, message))
{
#if DEBUG
if (_gateway.logger.IsEnabled(LogLevel.Trace)) _gateway.logger.LogTrace("Sent queued message {Message} to client {ClientId}", message, Id);
#endif
LogTraceSentQueuedMessage(_gateway.logger, message, Id);
}
else
{
Expand Down Expand Up @@ -515,6 +511,18 @@ internal void DropExpiredEntries()
)]
private static partial void LogWarningGatewayClientReceivedNewConnectionBeforePreviousConnectionRemoved(ILogger logger, ClientGrainId clientId, GatewayInboundConnection newConnection, GatewayInboundConnection previousConnection);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Queued message {Message} for client {TargetGrain}"
)]
private static partial void LogTraceQueuedMessage(ILogger logger, object message, GrainId targetGrain);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Sent queued message {Message} to client {ClientId}"
)]
private static partial void LogTraceSentQueuedMessage(ILogger logger, object message, ClientGrainId clientId);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Exception in message loop for client {ClientId}"
Expand Down
Loading
Loading