Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 78 additions & 131 deletions src/Orleans.Runtime/Activation/ActivationDataActivatorProvider.cs
Original file line number Diff line number Diff line change
@@ -1,164 +1,111 @@
using System;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.GrainReferences;
using Orleans.Metadata;
using Orleans.Runtime.Internal;
using Orleans.Runtime.Scheduler;

namespace Orleans.Runtime
namespace Orleans.Runtime;

internal partial class ActivationDataActivatorProvider(
GrainClassMap grainClassMap,
IServiceProvider serviceProvider,
GrainTypeSharedContextResolver sharedComponentsResolver,
ILogger<WorkItemGroup> workItemGroupLogger,
ILogger<ActivationTaskScheduler> activationTaskSchedulerLogger,
IOptions<SchedulingOptions> schedulingOptions,
IOptions<StatelessWorkerOptions> statelessWorkerOptions) : IGrainContextActivatorProvider
{
internal partial class ActivationDataActivatorProvider : IGrainContextActivatorProvider
public bool TryGet(GrainType grainType, [NotNullWhen(true)] out IGrainContextActivator? activator)
{
if (!grainClassMap.TryGetGrainClass(grainType, out var grainClass) || !typeof(IGrain).IsAssignableFrom(grainClass))
{
activator = null;
return false;
}

var sharedContext = sharedComponentsResolver.GetComponents(grainType);
var instanceActivator = sharedContext.GetComponent<IGrainActivator>();
if (instanceActivator is null)
{
throw new InvalidOperationException($"Could not find a suitable {nameof(IGrainActivator)} implementation for grain type {grainType}");
}

var innerActivator = new ActivationDataActivator(
instanceActivator,
serviceProvider,
sharedContext,
workItemGroupLogger,
activationTaskSchedulerLogger,
schedulingOptions);

if (sharedContext.PlacementStrategy is StatelessWorkerPlacement)
{
var statelessWorkerSharedContext = new StatelessWorkerGrainTypeSharedContext(sharedContext, statelessWorkerOptions);
activator = new StatelessWorkerActivator(statelessWorkerSharedContext, innerActivator);
}
else
{
activator = innerActivator;
}

return true;
}

private partial class ActivationDataActivator : IGrainContextActivator
{
private readonly IServiceProvider _serviceProvider;
private readonly IActivationWorkingSet _activationWorkingSet;
private readonly ILogger<WorkItemGroup> _workItemGroupLogger;
private readonly ILogger<Grain> _grainLogger;
private readonly ILogger<ActivationTaskScheduler> _activationTaskSchedulerLogger;
private readonly IOptions<SchedulingOptions> _schedulingOptions;
private readonly GrainTypeSharedContextResolver _sharedComponentsResolver;
private readonly GrainClassMap _grainClassMap;
private readonly ILoggerFactory _loggerFactory;
private readonly GrainReferenceActivator _grainReferenceActivator;
private readonly IGrainActivator _grainActivator;
private readonly IServiceProvider _serviceProvider;
private readonly GrainTypeSharedContext _sharedComponents;
private readonly Func<IGrainContext, WorkItemGroup> _createWorkItemGroup;
private readonly Action<object?> _startActivation;

public ActivationDataActivatorProvider(
GrainClassMap grainClassMap,
public ActivationDataActivator(
IGrainActivator grainActivator,
IServiceProvider serviceProvider,
ILoggerFactory loggerFactory,
GrainReferenceActivator grainReferenceActivator,
GrainTypeSharedContextResolver sharedComponentsResolver,
IActivationWorkingSet activationWorkingSet,
ILogger<Grain> grainLogger,
GrainTypeSharedContext sharedComponents,
ILogger<WorkItemGroup> workItemGroupLogger,
ILogger<ActivationTaskScheduler> activationTaskSchedulerLogger,
IOptions<SchedulingOptions> schedulingOptions)
{
_activationWorkingSet = activationWorkingSet;
_workItemGroupLogger = workItemGroupLogger;
_grainLogger = grainLogger;
_activationTaskSchedulerLogger = activationTaskSchedulerLogger;
_schedulingOptions = schedulingOptions;
_sharedComponentsResolver = sharedComponentsResolver;
_grainClassMap = grainClassMap;
_grainActivator = grainActivator;
_serviceProvider = serviceProvider;
_loggerFactory = loggerFactory;
_grainReferenceActivator = grainReferenceActivator;
}

public bool TryGet(GrainType grainType, [NotNullWhen(true)] out IGrainContextActivator? activator)
{
if (!_grainClassMap.TryGetGrainClass(grainType, out var grainClass) || !typeof(IGrain).IsAssignableFrom(grainClass))
{
activator = null;
return false;
}

var sharedContext = _sharedComponentsResolver.GetComponents(grainType);
var instanceActivator = sharedContext.GetComponent<IGrainActivator>();
if (instanceActivator is null)
{
throw new InvalidOperationException($"Could not find a suitable {nameof(IGrainActivator)} implementation for grain type {grainType}");
}

var innerActivator = new ActivationDataActivator(
instanceActivator,
_serviceProvider,
sharedContext,
_grainLogger,
_sharedComponents = sharedComponents;
_createWorkItemGroup = context => new WorkItemGroup(
context,
_workItemGroupLogger,
_activationTaskSchedulerLogger,
_schedulingOptions);

if (sharedContext.PlacementStrategy is StatelessWorkerPlacement)
{
activator = new StatelessWorkerActivator(sharedContext, innerActivator);
}
else
{
activator = innerActivator;
}

return true;
_startActivation = state => ((ActivationData)state!).Start(_grainActivator);
}

private partial class ActivationDataActivator : IGrainContextActivator
public IGrainContext CreateContext(GrainAddress activationAddress)
{
private readonly ILogger<WorkItemGroup> _workItemGroupLogger;
private readonly ILogger<ActivationTaskScheduler> _activationTaskSchedulerLogger;
private readonly IOptions<SchedulingOptions> _schedulingOptions;
private readonly IGrainActivator _grainActivator;
private readonly IServiceProvider _serviceProvider;
private readonly GrainTypeSharedContext _sharedComponents;
private readonly ILogger<Grain> _grainLogger;
private readonly Func<IGrainContext, WorkItemGroup> _createWorkItemGroup;
private readonly Action<object?> _startActivation;

public ActivationDataActivator(
IGrainActivator grainActivator,
IServiceProvider serviceProvider,
GrainTypeSharedContext sharedComponents,
ILogger<Grain> grainLogger,
ILogger<WorkItemGroup> workItemGroupLogger,
ILogger<ActivationTaskScheduler> activationTaskSchedulerLogger,
IOptions<SchedulingOptions> schedulingOptions)
{
_workItemGroupLogger = workItemGroupLogger;
_activationTaskSchedulerLogger = activationTaskSchedulerLogger;
_schedulingOptions = schedulingOptions;
_grainActivator = grainActivator;
_serviceProvider = serviceProvider;
_sharedComponents = sharedComponents;
_grainLogger = grainLogger;
_createWorkItemGroup = context => new WorkItemGroup(
context,
_workItemGroupLogger,
_activationTaskSchedulerLogger,
_schedulingOptions);
_startActivation = state => ((ActivationData)state!).Start(_grainActivator);
}

public IGrainContext CreateContext(GrainAddress activationAddress)
{
var context = new ActivationData(
activationAddress,
_createWorkItemGroup,
_serviceProvider,
_sharedComponents);

using var ecSuppressor = ExecutionContext.SuppressFlow();
_ = Task.Factory.StartNew(
_startActivation,
context,
CancellationToken.None,
TaskCreationOptions.DenyChildAttach,
context.ActivationTaskScheduler);
return context;
}
var context = new ActivationData(
activationAddress,
_createWorkItemGroup,
_serviceProvider,
_sharedComponents);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Failed to dispose grain '{GrainId}'."
)]
private static partial void LogErrorFailedToDisposeGrain(ILogger logger, Exception exception, GrainId grainId);
using var ecSuppressor = ExecutionContext.SuppressFlow();
_ = Task.Factory.StartNew(
_startActivation,
context,
CancellationToken.None,
TaskCreationOptions.DenyChildAttach,
context.ActivationTaskScheduler);
return context;
}
}
}

internal class StatelessWorkerActivator : IGrainContextActivator
{
private readonly IGrainContextActivator _innerActivator;
private readonly GrainTypeSharedContext _sharedContext;

public StatelessWorkerActivator(GrainTypeSharedContext sharedContext, IGrainContextActivator innerActivator)
{
_innerActivator = innerActivator;
_sharedContext = sharedContext;
}

public IGrainContext CreateContext(GrainAddress address) => new StatelessWorkerGrainContext(address, _sharedContext, _innerActivator);
}
internal class StatelessWorkerActivator(StatelessWorkerGrainTypeSharedContext sharedContext, IGrainContextActivator innerActivator) : IGrainContextActivator
{
public IGrainContext CreateContext(GrainAddress address) => new StatelessWorkerGrainContext(address, sharedContext, innerActivator);
}
12 changes: 2 additions & 10 deletions src/Orleans.Runtime/Catalog/GrainTypeSharedContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ namespace Orleans.Runtime;
public sealed class GrainTypeSharedContext
{
private readonly IServiceProvider _serviceProvider;
private readonly Dictionary<Type, object> _components = new();
private InternalGrainRuntime? _internalGrainRuntime;
private readonly Dictionary<Type, object> _components = [];

public GrainTypeSharedContext(
GrainType grainType,
Expand All @@ -32,7 +31,6 @@ public GrainTypeSharedContext(
IOptions<SiloMessagingOptions> messagingOptions,
IOptions<GrainCollectionOptions> collectionOptions,
IOptions<SchedulingOptions> schedulingOptions,
IOptions<StatelessWorkerOptions> statelessWorkerOptions,
IGrainRuntime grainRuntime,
ILoggerFactory loggerFactory,
GrainReferenceActivator grainReferenceActivator,
Expand All @@ -56,7 +54,6 @@ public GrainTypeSharedContext(
var grainDirectoryResolver = serviceProvider.GetRequiredService<GrainDirectoryResolver>();
GrainDirectory = PlacementStrategy.IsUsingGrainDirectory ? grainDirectoryResolver.Resolve(grainType) : null;
SchedulingOptions = schedulingOptions.Value;
StatelessWorkerOptions = statelessWorkerOptions.Value;
Runtime = grainRuntime;
MigrationManager = _serviceProvider.GetService<IActivationMigrationManager>();

Expand Down Expand Up @@ -204,11 +201,6 @@ public void SetComponent<TComponent>(TComponent? instance)
/// </summary>
public SchedulingOptions SchedulingOptions { get; }

/// <summary>
/// Gets the stateless worker options.
/// </summary>
public StatelessWorkerOptions StatelessWorkerOptions { get; }

/// <summary>
/// Gets the grain runtime.
/// </summary>
Expand All @@ -222,7 +214,7 @@ public void SetComponent<TComponent>(TComponent? instance)
/// <summary>
/// Gets the internal grain runtime.
/// </summary>
internal InternalGrainRuntime InternalRuntime => _internalGrainRuntime ??= _serviceProvider.GetRequiredService<InternalGrainRuntime>();
internal InternalGrainRuntime InternalRuntime => field ??= _serviceProvider.GetRequiredService<InternalGrainRuntime>();

/// <summary>
/// Called on creation of an activation.
Expand Down
Loading
Loading