diff --git a/src/Orleans.Core/Runtime/Constants.cs b/src/Orleans.Core/Runtime/Constants.cs index 5409d132e59..a405893db65 100644 --- a/src/Orleans.Core/Runtime/Constants.cs +++ b/src/Orleans.Core/Runtime/Constants.cs @@ -16,7 +16,6 @@ internal static class Constants public static readonly GrainType CatalogType = SystemTargetGrainId.CreateGrainType("catalog"); public static readonly GrainType MembershipServiceType = SystemTargetGrainId.CreateGrainType("clustering"); public static readonly GrainType SystemMembershipTableType = SystemTargetGrainId.CreateGrainType("clustering.dev"); - public static readonly GrainType FallbackSystemTargetType = SystemTargetGrainId.CreateGrainType("fallback"); public static readonly GrainType LifecycleSchedulingSystemTargetType = SystemTargetGrainId.CreateGrainType("lifecycle"); public static readonly GrainType DeploymentLoadPublisherSystemTargetType = SystemTargetGrainId.CreateGrainType("load-publisher"); public static readonly GrainType TestHooksSystemTargetType = SystemTargetGrainId.CreateGrainType("test.hooks"); @@ -47,7 +46,6 @@ internal static class Constants {ClientDirectoryType, "ClientDirectory"}, {CatalogType,"Catalog"}, {MembershipServiceType,"MembershipService"}, - {FallbackSystemTargetType, "FallbackSystemTarget"}, {LifecycleSchedulingSystemTargetType, "LifecycleSchedulingSystemTarget"}, {DeploymentLoadPublisherSystemTargetType, "DeploymentLoadPublisherSystemTarget"}, {StreamProviderManagerAgentSystemTargetType,"StreamProviderManagerAgent"}, diff --git a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs index 41b85006761..b990a61e1c1 100644 --- a/src/Orleans.Reminders/ReminderService/LocalReminderService.cs +++ b/src/Orleans.Reminders/ReminderService/LocalReminderService.cs @@ -37,18 +37,15 @@ internal sealed class LocalReminderService : GrainService, IReminderService, ILi public LocalReminderService( GrainReferenceActivator referenceActivator, GrainInterfaceTypeResolver interfaceTypeResolver, - ILocalSiloDetails localSiloDetails, IReminderTable reminderTable, - ILoggerFactory loggerFactory, IAsyncTimerFactory asyncTimerFactory, IOptions reminderOptions, IConsistentRingProvider ringProvider, - Catalog catalog) + SystemTargetShared shared) : base( - SystemTargetGrainId.CreateGrainServiceGrainId(GrainInterfaceUtils.GetGrainClassTypeCode(typeof(IReminderService)), null, localSiloDetails.SiloAddress), - localSiloDetails.SiloAddress, - loggerFactory, - ringProvider) + SystemTargetGrainId.CreateGrainServiceGrainId(GrainInterfaceUtils.GetGrainClassTypeCode(typeof(IReminderService)), null, shared.SiloAddress), + ringProvider, + shared) { _referenceActivator = referenceActivator; _grainInterfaceType = interfaceTypeResolver.GetGrainInterfaceType(typeof(IRemindable)); @@ -57,9 +54,9 @@ public LocalReminderService( this.asyncTimerFactory = asyncTimerFactory; ReminderInstruments.RegisterActiveRemindersObserve(() => localReminders.Count); startedTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - this.logger = loggerFactory.CreateLogger(); + this.logger = shared.LoggerFactory.CreateLogger(); this.listRefreshTimer = asyncTimerFactory.Create(this.reminderOptions.RefreshReminderListPeriod, "ReminderService.ReminderListRefresher"); - catalog.RegisterSystemTarget(this); + shared.ActivationDirectory.RecordNewTarget(this); } void ILifecycleParticipant.Participate(ISiloLifecycle observer) diff --git a/src/Orleans.Runtime/Cancellation/GrainCallCancellationManager.cs b/src/Orleans.Runtime/Cancellation/GrainCallCancellationManager.cs index 0d64ae4f950..30a8dc0d494 100644 --- a/src/Orleans.Runtime/Cancellation/GrainCallCancellationManager.cs +++ b/src/Orleans.Runtime/Cancellation/GrainCallCancellationManager.cs @@ -61,17 +61,17 @@ public GrainCallCancellationManager( IServiceProvider serviceProvider, Catalog catalog, ActivationDirectory activationDirectory, - IClusterMembershipService clusterMembershipService) : base(Constants.CancellationManagerType, localSiloDetails.SiloAddress, loggerFactory) + IClusterMembershipService clusterMembershipService, + SystemTargetShared shared) : base(Constants.CancellationManagerType, shared) { _serviceProvider = serviceProvider; _logger = loggerFactory.CreateLogger(); _catalog = catalog; _activationDirectory = activationDirectory; _clusterMembershipService = clusterMembershipService; - _catalog.RegisterSystemTarget(this); + using (new ExecutionContextSuppressor()) { - using var _ = new ExecutionContextSuppressor(); _membershipUpdatesTask = Task.Factory.StartNew( state => ((GrainCallCancellationManager)state!).ProcessMembershipUpdates(), this, @@ -80,6 +80,8 @@ public GrainCallCancellationManager( WorkItemGroup.TaskScheduler).Unwrap(); _membershipUpdatesTask.Ignore(); } + + shared.ActivationDirectory.RecordNewTarget(this); } private IInternalGrainFactory GrainFactory => _grainFactory ??= _serviceProvider.GetRequiredService(); diff --git a/src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs b/src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs index d0335d9a95d..3180408f2ab 100644 --- a/src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs +++ b/src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs @@ -53,7 +53,7 @@ internal interface IActivationMigrationManager /// /// Migrates grain activations to target hosts and handles migration requests from other hosts. /// -internal partial class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager, ILifecycleParticipant +internal sealed partial class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager, ILifecycleParticipant { private const int MaxBatchSize = 1_000; private readonly ConcurrentDictionary WorkItemChannel)> _workers = new(); @@ -74,13 +74,14 @@ public ActivationMigrationManager( ILoggerFactory loggerFactory, IInternalGrainFactory grainFactory, Catalog catalog, - IClusterMembershipService clusterMembershipService) : base(Constants.ActivationMigratorType, localSiloDetails.SiloAddress, loggerFactory) + SystemTargetShared shared, + IClusterMembershipService clusterMembershipService) : base(Constants.ActivationMigratorType, shared) { _grainFactory = grainFactory; _logger = loggerFactory.CreateLogger(); _catalog = catalog; _clusterMembershipService = clusterMembershipService; - _catalog.RegisterSystemTarget(this); + shared.ActivationDirectory.RecordNewTarget(this); { using var _ = new ExecutionContextSuppressor(); diff --git a/src/Orleans.Runtime/Catalog/Catalog.cs b/src/Orleans.Runtime/Catalog/Catalog.cs index 91b433005da..4c603aed081 100644 --- a/src/Orleans.Runtime/Catalog/Catalog.cs +++ b/src/Orleans.Runtime/Catalog/Catalog.cs @@ -5,24 +5,21 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Orleans.Configuration; using Orleans.GrainDirectory; using Orleans.Runtime.GrainDirectory; -using Orleans.Runtime.Scheduler; namespace Orleans.Runtime { - internal sealed partial class Catalog : SystemTarget, ICatalog + internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant { - public SiloAddress LocalSilo { get; private set; } - internal ISiloStatusOracle SiloStatusOracle { get; set; } + private readonly SiloAddress _siloAddress; private readonly ActivationCollector activationCollector; private readonly GrainDirectoryResolver grainDirectoryResolver; private readonly ActivationDirectory activations; private readonly IServiceProvider serviceProvider; private readonly ILogger logger; private readonly GrainContextActivator grainActivator; + private ISiloStatusOracle _siloStatusOracle; public Catalog( ILocalSiloDetails localSiloDetails, @@ -31,17 +28,17 @@ public Catalog( ActivationCollector activationCollector, IServiceProvider serviceProvider, ILoggerFactory loggerFactory, - GrainContextActivator grainActivator) - : base(Constants.CatalogType, localSiloDetails.SiloAddress, loggerFactory) + GrainContextActivator grainActivator, + SystemTargetShared shared) + : base(Constants.CatalogType, shared) { - this.LocalSilo = localSiloDetails.SiloAddress; + this._siloAddress = localSiloDetails.SiloAddress; this.grainDirectoryResolver = grainDirectoryResolver; this.activations = activationDirectory; this.serviceProvider = serviceProvider; this.grainActivator = grainActivator; this.logger = loggerFactory.CreateLogger(); this.activationCollector = activationCollector; - this.RuntimeClient = serviceProvider.GetRequiredService(); GC.GetTotalMemory(true); // need to call once w/true to ensure false returns OK value @@ -58,7 +55,7 @@ public Catalog( return counter; }); - RegisterSystemTarget(this); + shared.ActivationDirectory.RecordNewTarget(this); } /// @@ -94,29 +91,6 @@ internal int UnregisterGrainForTesting(GrainId grain) return 1; } - public void RegisterSystemTarget(ISystemTarget target) - { - var systemTarget = target as SystemTarget; - if (systemTarget == null) throw new ArgumentException($"Parameter must be of type {typeof(SystemTarget)}", nameof(target)); - systemTarget.RuntimeClient = this.RuntimeClient; - var sp = this.serviceProvider; - systemTarget.WorkItemGroup = new WorkItemGroup( - systemTarget, - sp.GetRequiredService>(), - sp.GetRequiredService>(), - sp.GetRequiredService>()); - activations.RecordNewTarget(systemTarget); - } - - public void UnregisterSystemTarget(ISystemTarget target) - { - var systemTarget = target as SystemTarget; - if (systemTarget == null) throw new ArgumentException($"Parameter must be of type {typeof(SystemTarget)}", nameof(target)); - activations.RemoveTarget(systemTarget); - } - - public int ActivationCount { get { return activations.Count; } } - /// /// If activation already exists, return it. /// Otherwise, creates a new activation, begins rehydrating it and activating it, then returns it. @@ -154,7 +128,7 @@ public IGrainContext GetOrCreateActivation( return result; } - if (!SiloStatusOracle.CurrentStatus.IsTerminating()) + if (!_siloStatusOracle.CurrentStatus.IsTerminating()) { var address = new GrainAddress { @@ -191,7 +165,7 @@ public IGrainContext GetOrCreateActivation( static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId) { // Did not find and did not start placing new - var isTerminating = self.SiloStatusOracle.CurrentStatus.IsTerminating(); + var isTerminating = self._siloStatusOracle.CurrentStatus.IsTerminating(); if (isTerminating) { self.LogDebugUnableToCreateActivationTerminating(grainId); @@ -292,14 +266,6 @@ await Parallel.ForEachAsync(activations, options, (kv, _) => }).WaitAsync(cancellationToken); } - public SiloStatus LocalSiloStatus - { - get - { - return SiloStatusOracle.CurrentStatus; - } - } - public async Task DeleteActivations(List addresses, DeactivationReasonCode reasonCode, string reasonText) { var tasks = new List(addresses.Count); @@ -320,7 +286,7 @@ await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) => internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status) { // ignore joining events and also events on myself. - if (updatedSilo.Equals(LocalSilo)) return; + if (updatedSilo.Equals(_siloAddress)) return; // We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states, // since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses, @@ -386,6 +352,12 @@ void StartDeactivatingActivations(DeactivationReason reason, List } } + void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) + { + // Do nothing, just ensure that this instance is created so that it can register itself in the activation directory. + _siloStatusOracle = serviceProvider.GetRequiredService(); + } + private readonly struct SiloAddressLogValue(SiloAddress silo) { public override string ToString() => silo.ToStringWithHashCode(); diff --git a/src/Orleans.Runtime/Catalog/SystemTargetShared.cs b/src/Orleans.Runtime/Catalog/SystemTargetShared.cs new file mode 100644 index 00000000000..9c1ddb5ed60 --- /dev/null +++ b/src/Orleans.Runtime/Catalog/SystemTargetShared.cs @@ -0,0 +1,41 @@ +#nullable enable +using System; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Orleans.Configuration; +using Orleans.GrainReferences; +using Orleans.Runtime.Scheduler; +using Orleans.Timers; + +namespace Orleans.Runtime; + +internal sealed class SystemTargetShared( + InsideRuntimeClient runtimeClient, + ILocalSiloDetails localSiloDetails, + ILoggerFactory loggerFactory, + IOptions schedulingOptions, + GrainReferenceActivator grainReferenceActivator, + ITimerRegistry timerRegistry, + ActivationDirectory activations) +{ + private readonly ILogger _workItemGroupLogger = loggerFactory.CreateLogger(); + private readonly ILogger _activationTaskSchedulerLogger = loggerFactory.CreateLogger(); + public SiloAddress SiloAddress => localSiloDetails.SiloAddress; + + public ILoggerFactory LoggerFactory => loggerFactory; + public GrainReferenceActivator GrainReferenceActivator => grainReferenceActivator; + public ITimerRegistry TimerRegistry => timerRegistry; + + public RuntimeMessagingTrace MessagingTrace => new(loggerFactory); + public InsideRuntimeClient RuntimeClient => runtimeClient; + public ActivationDirectory ActivationDirectory => activations; + public WorkItemGroup CreateWorkItemGroup(SystemTarget systemTarget) + { + ArgumentNullException.ThrowIfNull(systemTarget); + return new WorkItemGroup( + systemTarget, + _workItemGroupLogger, + _activationTaskSchedulerLogger, + schedulingOptions); + } +} diff --git a/src/Orleans.Runtime/Core/SystemTarget.cs b/src/Orleans.Runtime/Core/SystemTarget.cs index 4382c8068be..c9b079787e6 100644 --- a/src/Orleans.Runtime/Core/SystemTarget.cs +++ b/src/Orleans.Runtime/Core/SystemTarget.cs @@ -5,10 +5,8 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Orleans.GrainReferences; using Orleans.Runtime.Scheduler; using Orleans.Serialization.Invocation; -using Orleans.Timers; namespace Orleans.Runtime { @@ -19,78 +17,67 @@ namespace Orleans.Runtime /// public abstract partial class SystemTarget : ISystemTarget, ISystemTargetBase, IGrainContext, IGrainExtensionBinder, ISpanFormattable, IDisposable, IGrainTimerRegistry { - private readonly SystemTargetGrainId id; + private readonly SystemTargetGrainId _id; + private readonly SystemTargetShared _shared; private readonly HashSet _timers = []; - private GrainReference selfReference; - private Message running; + private GrainReference _selfReference; + private Message _running; private Dictionary _components = new Dictionary(); /// Silo address of the system target. - public SiloAddress Silo { get; } + public SiloAddress Silo => _shared.SiloAddress; internal GrainAddress ActivationAddress { get; } internal ActivationId ActivationId { get; set; } - private InsideRuntimeClient runtimeClient; - private RuntimeMessagingTrace messagingTrace; - private readonly ILogger logger; + private readonly ILogger _logger; - internal InsideRuntimeClient RuntimeClient - { - get - { - if (this.runtimeClient == null) - throw new OrleansException( - $"{nameof(this.RuntimeClient)} has not been set on {this.GetType()}. Most likely, this means that the system target was not registered."); - return this.runtimeClient; - } - set { this.runtimeClient = value; } - } + internal InsideRuntimeClient RuntimeClient => _shared.RuntimeClient; /// - public GrainReference GrainReference => selfReference ??= this.RuntimeClient.ServiceProvider.GetRequiredService().CreateReference(this.id.GrainId, default); + public GrainReference GrainReference => _selfReference ??= _shared.GrainReferenceActivator.CreateReference(_id.GrainId, default); /// - public GrainId GrainId => this.id.GrainId; + public GrainId GrainId => _id.GrainId; /// object IGrainContext.GrainInstance => this; /// - ActivationId IGrainContext.ActivationId => this.ActivationId; + ActivationId IGrainContext.ActivationId => ActivationId; /// - GrainAddress IGrainContext.Address => this.ActivationAddress; + GrainAddress IGrainContext.Address => ActivationAddress; - private RuntimeMessagingTrace MessagingTrace => this.messagingTrace ??= this.RuntimeClient.ServiceProvider.GetRequiredService(); + private RuntimeMessagingTrace MessagingTrace => _shared.MessagingTrace; /// Only needed to make Reflection happy. protected SystemTarget() { } - internal SystemTarget(GrainType grainType, SiloAddress siloAddress, ILoggerFactory loggerFactory) - : this(SystemTargetGrainId.Create(grainType, siloAddress), siloAddress, loggerFactory) + internal SystemTarget(GrainType grainType, SystemTargetShared shared) + : this(SystemTargetGrainId.Create(grainType, shared.SiloAddress), shared) { } - internal SystemTarget(SystemTargetGrainId grainId, SiloAddress silo, ILoggerFactory loggerFactory) + internal SystemTarget(SystemTargetGrainId grainId, SystemTargetShared shared) { - this.id = grainId; - this.Silo = silo; - this.ActivationId = ActivationId.GetDeterministic(grainId.GrainId); - this.ActivationAddress = GrainAddress.GetAddress(this.Silo, this.id.GrainId, this.ActivationId); - this.logger = loggerFactory.CreateLogger(this.GetType()); - + _id = grainId; + _shared = shared; + ActivationId = ActivationId.GetDeterministic(grainId.GrainId); + ActivationAddress = GrainAddress.GetAddress(Silo, _id.GrainId, ActivationId); + _logger = shared.LoggerFactory.CreateLogger(GetType()); + WorkItemGroup = _shared.CreateWorkItemGroup(this); if (!Constants.IsSingletonSystemTarget(GrainId.Type)) { GrainInstruments.IncrementSystemTargetCounts(Constants.SystemTargetName(GrainId.Type)); } } - internal WorkItemGroup WorkItemGroup { get; set; } + internal WorkItemGroup WorkItemGroup { get; } /// - public IServiceProvider ActivationServices => this.RuntimeClient.ServiceProvider; + public IServiceProvider ActivationServices => RuntimeClient.ServiceProvider; /// IGrainLifecycle IGrainContext.ObservableLifecycle => throw new NotImplementedException("IGrainContext.ObservableLifecycle is not implemented by SystemTarget"); @@ -146,14 +133,14 @@ public void SetComponent(TComponent instance) where TComponent : cla internal void HandleNewRequest(Message request) { - running = request; - this.RuntimeClient.Invoke(this, request).Ignore(); + _running = request; + RuntimeClient.Invoke(this, request).Ignore(); } internal void HandleResponse(Message response) { - running = response; - this.RuntimeClient.ReceiveResponse(response); + _running = response; + RuntimeClient.ReceiveResponse(response); } /// @@ -176,7 +163,7 @@ internal void HandleResponse(Message response) public IGrainTimer RegisterTimer(Func callback, object state, TimeSpan dueTime, TimeSpan period) { ArgumentNullException.ThrowIfNull(callback); - var timer = this.ActivationServices.GetRequiredService() + var timer = _shared.TimerRegistry .RegisterGrainTimer(this, static (state, _) => state.Callback(state.State), (Callback: callback, State: state), new() { DueTime = dueTime, Period = period, Interleave = true }); return timer; } @@ -201,7 +188,7 @@ public IGrainTimer RegisterGrainTimer(Func callback, Ti { CheckRuntimeContext(); ArgumentNullException.ThrowIfNull(callback); - var timer = this.ActivationServices.GetRequiredService() + var timer = _shared.TimerRegistry .RegisterGrainTimer(this, (state, ct) => state(ct), callback, new() { DueTime = dueTime, Period = period, Interleave = true }); return timer; } @@ -228,7 +215,7 @@ public IGrainTimer RegisterGrainTimer(Func() + var timer = _shared.TimerRegistry .RegisterGrainTimer(this, callback, state, new() { DueTime = dueTime, Period = period, Interleave = true }); return timer; } @@ -239,10 +226,10 @@ public IGrainTimer RegisterGrainTimer(Func ToString(); bool ISpanFormattable.TryFormat(Span destination, out int charsWritten, ReadOnlySpan format, IFormatProvider provider) - => destination.TryWrite($"[SystemTarget: {Silo}/{id}{ActivationId}]", out charsWritten); + => destination.TryWrite($"[SystemTarget: {Silo}/{_id}{ActivationId}]", out charsWritten); /// Adds details about message currently being processed - internal string ToDetailedString() => $"{this} CurrentlyExecuting={running}{(running != null ? null : "null")}"; + internal string ToDetailedString() => $"{this} CurrentlyExecuting={_running}{(_running != null ? null : "null")}"; /// bool IEquatable.Equals(IGrainContext other) => ReferenceEquals(this, other); @@ -253,7 +240,7 @@ bool ISpanFormattable.TryFormat(Span destination, out int charsWritten, Re where TExtensionInterface : class, IGrainExtension { TExtension implementation; - if (this.GetComponent() is object existing) + if (GetComponent() is object existing) { if (existing is TExtension typedResult) { @@ -267,26 +254,26 @@ bool ISpanFormattable.TryFormat(Span destination, out int charsWritten, Re else { implementation = newExtensionFunc(); - this.SetComponent(implementation); + SetComponent(implementation); } - var reference = this.GrainReference.Cast(); + var reference = GrainReference.Cast(); return (implementation, reference); } /// TComponent ITargetHolder.GetComponent() { - var result = this.GetComponent(); + var result = GetComponent(); if (result is null && typeof(IGrainExtension).IsAssignableFrom(typeof(TComponent))) { - var implementation = this.ActivationServices.GetKeyedService(typeof(TComponent)); + var implementation = ActivationServices.GetKeyedService(typeof(TComponent)); if (implementation is not TComponent typedResult) { throw new GrainExtensionNotInstalledException($"No extension of type {typeof(TComponent)} is installed on this instance and no implementations are registered for automated install"); } - this.SetComponent(typedResult); + SetComponent(typedResult); result = typedResult; } @@ -297,18 +284,18 @@ TComponent ITargetHolder.GetComponent() public TExtensionInterface GetExtension() where TExtensionInterface : class, IGrainExtension { - if (this.GetComponent() is TExtensionInterface result) + if (GetComponent() is TExtensionInterface result) { return result; } - var implementation = this.ActivationServices.GetKeyedService(typeof(TExtensionInterface)); + var implementation = ActivationServices.GetKeyedService(typeof(TExtensionInterface)); if (!(implementation is TExtensionInterface typedResult)) { throw new GrainExtensionNotInstalledException($"No extension of type {typeof(TExtensionInterface)} is installed on this instance and no implementations are registered for automated install"); } - this.SetComponent(typedResult); + SetComponent(typedResult); return typedResult; } @@ -321,14 +308,14 @@ public void ReceiveMessage(object message) case Message.Directions.Request: case Message.Directions.OneWay: { - this.MessagingTrace.OnEnqueueMessageOnActivation(msg, this); + MessagingTrace.OnEnqueueMessageOnActivation(msg, this); var workItem = new RequestWorkItem(this, msg); - this.WorkItemGroup.QueueWorkItem(workItem); + WorkItemGroup.QueueWorkItem(workItem); break; } default: - LogInvalidMessage(this.logger, msg); + LogInvalidMessage(_logger, msg); break; } } diff --git a/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs index 5188da7981c..be63a1e747c 100644 --- a/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ClientDirectory.cs @@ -65,8 +65,8 @@ public ClientDirectory( IClusterMembershipService clusterMembershipService, IAsyncTimerFactory timerFactory, IConnectedClientCollection connectedClients, - Catalog catalog) - : base(Constants.ClientDirectoryType, siloDetails.SiloAddress, loggerFactory) + SystemTargetShared shared) + : base(Constants.ClientDirectoryType, shared) { _consistentRing = new SimpleConsistentRingProvider(siloDetails, clusterMembershipService); _grainFactory = grainFactory; @@ -78,7 +78,7 @@ public ClientDirectory( _connectedClients = connectedClients; _localHostedClientId = HostedClient.CreateHostedClientGrainId(_localSilo).GrainId; _schedulePublishUpdate = SchedulePublishUpdates; - catalog?.RegisterSystemTarget(this); + shared.ActivationDirectory.RecordNewTarget(this); } public ValueTask> Lookup(GrainId grainId) diff --git a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs index 12fad70b9af..9bb7e2513e8 100644 --- a/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/DistributedGrainDirectory.cs @@ -77,25 +77,28 @@ internal sealed partial class DistributedGrainDirectory : SystemTarget, IGrainDi // for each recovery version. private long _recoveryMembershipVersion; private Task _runTask = Task.CompletedTask; + private ActivationDirectory _localActivations; + private GrainDirectoryResolver? _grainDirectoryResolver; public DistributedGrainDirectory( DirectoryMembershipService membershipService, ILogger logger, - ILocalSiloDetails localSiloDetails, - ILoggerFactory loggerFactory, IServiceProvider serviceProvider, - IInternalGrainFactory grainFactory) : base(Constants.GrainDirectoryType, localSiloDetails.SiloAddress, loggerFactory) + IInternalGrainFactory grainFactory, + SystemTargetShared shared) : base(Constants.GrainDirectoryType, shared) { + _localActivations = shared.ActivationDirectory; _serviceProvider = serviceProvider; _membershipService = membershipService; _logger = logger; var partitions = ImmutableArray.CreateBuilder(DirectoryMembershipSnapshot.PartitionsPerSilo); for (var i = 0; i < DirectoryMembershipSnapshot.PartitionsPerSilo; i++) { - partitions.Add(new GrainDirectoryPartition(i, this, localSiloDetails, loggerFactory, serviceProvider, grainFactory)); + partitions.Add(new GrainDirectoryPartition(i, this, grainFactory, shared)); } _partitions = partitions.ToImmutable(); + shared.ActivationDirectory.RecordNewTarget(this); } public async Task Lookup(GrainId grainId) => await InvokeAsync( @@ -219,16 +222,15 @@ public async ValueTask>> GetRegisteredActivations(M Interlocked.CompareExchange(ref _recoveryMembershipVersion, membershipVersion.Value, recoveryMembershipVersion); } - var localActivations = _serviceProvider.GetRequiredService(); - var grainDirectoryResolver = _serviceProvider.GetRequiredService(); List result = []; List deactivationTasks = []; var stopwatch = CoarseStopwatch.StartNew(); using var cts = new CancellationTokenSource(); cts.Cancel(); - foreach (var (grainId, activation) in localActivations) + + foreach (var (grainId, activation) in _localActivations) { - var directory = GetGrainDirectory(activation, grainDirectoryResolver); + var directory = GetGrainDirectory(activation, _grainDirectoryResolver!); if (directory == this) { var address = activation.Address; @@ -301,6 +303,8 @@ public async ValueTask>> GetRegisteredActivations(M void ILifecycleParticipant.Participate(ISiloLifecycle observer) { + _grainDirectoryResolver = _serviceProvider.GetRequiredService(); + observer.Subscribe(nameof(DistributedGrainDirectory), ServiceLifecycleStage.RuntimeInitialize, OnRuntimeInitializeStart, OnRuntimeInitializeStop); // Transition into 'ShuttingDown'/'Stopping' stage, removing ourselves from directory membership, but allow some time for hand-off before transitioning to 'Dead'. @@ -308,13 +312,6 @@ void ILifecycleParticipant.Participate(ISiloLifecycle observer) Task OnRuntimeInitializeStart(CancellationToken cancellationToken) { - var catalog = _serviceProvider.GetRequiredService(); - catalog.RegisterSystemTarget(this); - foreach (var partition in _partitions) - { - catalog.RegisterSystemTarget(partition); - } - using var _ = new ExecutionContextSuppressor(); WorkItemGroup.QueueAction(() => _runTask = ProcessMembershipUpdates()); diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs index 3c1577aa6c8..b029a1f2cd7 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs @@ -19,25 +19,16 @@ namespace Orleans.Runtime.GrainDirectory; /// /// Represents a single contiguous partition of the distributed grain directory. /// -/// The index of this partition on this silo. Each silo hosts a fixed number of dynamically sized partitions. -internal sealed partial class GrainDirectoryPartition( - int partitionIndex, - DistributedGrainDirectory owner, - ILocalSiloDetails localSiloDetails, - ILoggerFactory loggerFactory, - IServiceProvider serviceProvider, - IInternalGrainFactory grainFactory) - : SystemTarget(CreateGrainId(localSiloDetails.SiloAddress, partitionIndex), localSiloDetails.SiloAddress, loggerFactory), IGrainDirectoryPartition, IGrainDirectoryTestHooks +internal sealed partial class GrainDirectoryPartition : SystemTarget, IGrainDirectoryPartition, IGrainDirectoryTestHooks { internal static SystemTargetGrainId CreateGrainId(SiloAddress siloAddress, int partitionIndex) => SystemTargetGrainId.Create(Constants.GrainDirectoryPartitionType, siloAddress, partitionIndex.ToString(CultureInfo.InvariantCulture)); private readonly Dictionary _directory = []; - private readonly int _partitionIndex = partitionIndex; - private readonly DistributedGrainDirectory _owner = owner; - private readonly IServiceProvider _serviceProvider = serviceProvider; - private readonly IInternalGrainFactory _grainFactory = grainFactory; + private readonly int _partitionIndex; + private readonly DistributedGrainDirectory _owner; + private readonly IInternalGrainFactory _grainFactory; private readonly CancellationTokenSource _drainSnapshotsCts = new(); - private readonly SiloAddress _id = localSiloDetails.SiloAddress; - private readonly ILogger _logger = loggerFactory.CreateLogger(); + private readonly SiloAddress _id; + private readonly ILogger _logger; private readonly TaskCompletionSource _snapshotsDrainedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); private readonly AsyncEnumerable _viewUpdates = new( DirectoryMembershipSnapshot.Default, @@ -57,6 +48,21 @@ internal sealed partial class GrainDirectoryPartition( private RingRange _currentRange; + /// The index of this partition on this silo. Each silo hosts a fixed number of dynamically sized partitions. + public GrainDirectoryPartition( + int partitionIndex, + DistributedGrainDirectory owner, + IInternalGrainFactory grainFactory, + SystemTargetShared shared) : base(CreateGrainId(shared.SiloAddress, partitionIndex), shared) + { + _partitionIndex = partitionIndex; + _owner = owner; + _grainFactory = grainFactory; + _id = shared.SiloAddress; + _logger = shared.LoggerFactory.CreateLogger(); + shared.ActivationDirectory.RecordNewTarget(this); + } + // The current directory membership snapshot. public DirectoryMembershipSnapshot CurrentView { get; private set; } = DirectoryMembershipSnapshot.Default; diff --git a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs index ced201f7048..3723ebcffcb 100644 --- a/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/ILocalGrainDirectory.cs @@ -103,11 +103,5 @@ internal interface ILocalGrainDirectory : IDhtGrainDirectory /// the address of the silo /// true if the silo is known to be part of this cluster bool IsSiloInCluster(SiloAddress silo); - - /// - /// Sets the callback to which is called when a silo is removed from membership. - /// - /// The callback. - void SetSiloRemovedCatalogCallback(Action catalogOnSiloRemoved); } } diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index 64fb6970f44..16d64fe7ea5 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -23,13 +23,13 @@ internal sealed class LocalGrainDirectory : ILocalGrainDirectory, ISiloStatusLis private readonly IInternalGrainFactory grainFactory; private readonly object writeLock = new object(); private readonly IServiceProvider _serviceProvider; - private Action? catalogOnSiloRemoved; private DirectoryMembership directoryMembership = DirectoryMembership.Default; // Consider: move these constants into an appropriate place internal const int HOP_LIMIT = 6; // forward a remote request no more than 5 times public static readonly TimeSpan RETRY_DELAY = TimeSpan.FromMilliseconds(200); // Pause 200ms between forwards to let the membership directory settle down internal bool Running; + private Catalog? _catalog; internal SiloAddress MyAddress { get; } @@ -49,7 +49,8 @@ public LocalGrainDirectory( Factory grainDirectoryPartitionFactory, IOptions developmentClusterMembershipOptions, IOptions grainDirectoryOptions, - ILoggerFactory loggerFactory) + ILoggerFactory loggerFactory, + SystemTargetShared systemTargetShared) { this.log = loggerFactory.CreateLogger(); @@ -75,12 +76,8 @@ public LocalGrainDirectory( DirectoryPartition = grainDirectoryPartitionFactory(); HandoffManager = new GrainDirectoryHandoffManager(this, siloStatusOracle, grainFactory, grainDirectoryPartitionFactory, loggerFactory); - RemoteGrainDirectory = new RemoteGrainDirectory(this, Constants.DirectoryServiceType, loggerFactory); - CacheValidator = new RemoteGrainDirectory(this, Constants.DirectoryCacheValidatorType, loggerFactory); - var catalog = serviceProvider.GetRequiredService(); - catalog.RegisterSystemTarget(RemoteGrainDirectory); - catalog.RegisterSystemTarget(CacheValidator); - SetSiloRemovedCatalogCallback(catalog.OnSiloStatusChange); + RemoteGrainDirectory = new RemoteGrainDirectory(this, Constants.DirectoryServiceType, systemTargetShared); + CacheValidator = new RemoteGrainDirectory(this, Constants.DirectoryCacheValidatorType, systemTargetShared); // add myself to the list of members AddServer(MyAddress); @@ -138,16 +135,6 @@ public async Task StopAsync() DirectoryCache.Clear(); } - /// - public void SetSiloRemovedCatalogCallback(Action callback) - { - if (callback == null) throw new ArgumentNullException(nameof(callback)); - lock (this.writeLock) - { - this.catalogOnSiloRemoved = callback; - } - } - private void AddServer(SiloAddress silo) { lock (this.writeLock) @@ -187,7 +174,8 @@ private void RemoveServer(SiloAddress silo, SiloStatus status) try { // Only notify the catalog once. Order is important: call BEFORE updating membershipRingList. - this.catalogOnSiloRemoved?.Invoke(this, silo, status); + _catalog = _serviceProvider.GetRequiredService(); + _catalog.OnSiloStatusChange(this, silo, status); } catch (Exception exc) { diff --git a/src/Orleans.Runtime/GrainDirectory/RemoteGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/RemoteGrainDirectory.cs index b447c983509..8fc2dbdbafd 100644 --- a/src/Orleans.Runtime/GrainDirectory/RemoteGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/RemoteGrainDirectory.cs @@ -14,12 +14,13 @@ internal sealed partial class RemoteGrainDirectory : SystemTarget, IRemoteGrainD private readonly LocalGrainDirectoryPartition partition; private readonly ILogger logger; - internal RemoteGrainDirectory(LocalGrainDirectory r, GrainType grainType, ILoggerFactory loggerFactory) - : base(grainType, r.MyAddress, loggerFactory) + internal RemoteGrainDirectory(LocalGrainDirectory localGrainDirectory, GrainType grainType, SystemTargetShared shared) + : base(grainType, shared) { - router = r; - partition = r.DirectoryPartition; - logger = loggerFactory.CreateLogger($"{typeof(RemoteGrainDirectory).FullName}.CacheValidator"); + router = localGrainDirectory; + partition = localGrainDirectory.DirectoryPartition; + logger = shared.LoggerFactory.CreateLogger($"{typeof(RemoteGrainDirectory).FullName}.CacheValidator"); + shared.ActivationDirectory.RecordNewTarget(this); } public Task RegisterAsync(GrainAddress address, GrainAddress? previousAddress, int hopCount) diff --git a/src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs b/src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs index efe95486399..cb35f8b8d40 100644 --- a/src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs +++ b/src/Orleans.Runtime/GrainTypeManager/ClusterManifestSystemTarget.cs @@ -1,14 +1,9 @@ -using System.Collections.Generic; -using System.Collections.Immutable; -using System.Linq; -using System.Runtime.CompilerServices; using System.Threading.Tasks; -using Microsoft.Extensions.Logging; using Orleans.Metadata; namespace Orleans.Runtime { - internal class ClusterManifestSystemTarget : SystemTarget, IClusterManifestSystemTarget, ISiloManifestSystemTarget + internal sealed class ClusterManifestSystemTarget : SystemTarget, IClusterManifestSystemTarget, ISiloManifestSystemTarget, ILifecycleParticipant { private readonly GrainManifest _siloManifest; private readonly IClusterMembershipService _clusterMembershipService; @@ -20,13 +15,13 @@ internal class ClusterManifestSystemTarget : SystemTarget, IClusterManifestSyste public ClusterManifestSystemTarget( IClusterMembershipService clusterMembershipService, IClusterManifestProvider clusterManifestProvider, - ILocalSiloDetails siloDetails, - ILoggerFactory loggerFactory) - : base(Constants.ManifestProviderType, siloDetails.SiloAddress, loggerFactory) + SystemTargetShared shared) + : base(Constants.ManifestProviderType, shared) { _siloManifest = clusterManifestProvider.LocalGrainManifest; _clusterMembershipService = clusterMembershipService; _clusterManifestProvider = clusterManifestProvider; + shared.ActivationDirectory.RecordNewTarget(this); } public ValueTask GetClusterManifest() => new(_clusterManifestProvider.Current); @@ -67,5 +62,9 @@ public ValueTask GetClusterManifestUpdate(MajorMinorVersi } public ValueTask GetSiloManifest() => new(_siloManifest); + void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) + { + // We don't participate in any lifecycle stages: activating this instance is all that is necessary. + } } } \ No newline at end of file diff --git a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs index e8d87193b1f..83f3b62bb63 100644 --- a/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs +++ b/src/Orleans.Runtime/Hosting/DefaultSiloServices.cs @@ -79,6 +79,8 @@ internal static void AddDefaultServices(ISiloBuilder builder) services.TryAddFromExisting(); services.AddSingleton(); services.AddFromExisting, SiloOptionsLogger>(); + services.AddSingleton(); + services.AddFromExisting, SiloControl>(); // Statistics services.AddSingleton(); @@ -90,8 +92,8 @@ internal static void AddDefaultServices(ISiloBuilder builder) services.TryAddSingleton(); - services.TryAddSingleton(); - services.TryAddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.TryAddSingleton(); @@ -150,19 +152,19 @@ internal static void AddDefaultServices(ISiloBuilder builder) services.TryAddSingleton(); - services.TryAddSingleton(); + services.AddSingleton(); services.AddFromExisting, DeploymentLoadPublisher>(); - services.TryAddSingleton(); - services.TryAddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.AddFromExisting(); services.AddFromExisting, MembershipTableManager>(); - services.TryAddSingleton(); + services.AddSingleton(); services.AddFromExisting(); services.AddFromExisting, MembershipSystemTarget>(); - services.TryAddSingleton(); - services.TryAddSingleton(); - services.TryAddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.TryAddFromExisting(); services.AddSingleton(); services.AddFromExisting, ClusterHealthMonitor>(); @@ -225,7 +227,7 @@ internal static void AddDefaultServices(ISiloBuilder builder) // Version selector strategy if (!services.Any(x => x.ServiceType == typeof(IVersionStore))) { - services.TryAddSingleton(); + services.AddSingleton(); services.AddFromExisting(); } services.AddFromExisting, GrainVersionStore>(); @@ -238,7 +240,7 @@ internal static void AddDefaultServices(ISiloBuilder builder) services.AddKeyedSingleton(typeof(AllCompatibleVersions)); // Compatibility - services.TryAddSingleton(); + services.AddSingleton(); // Compatability strategy services.AddKeyedSingleton(nameof(AllVersionsCompatible)); services.AddKeyedSingleton(nameof(BackwardCompatible)); @@ -251,11 +253,12 @@ internal static void AddDefaultServices(ISiloBuilder builder) services.TryAddSingleton>(sp => () => sp.GetRequiredService()); // Grain activation - services.TryAddSingleton(); - services.TryAddSingleton(); - services.TryAddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddFromExisting, Catalog>(); + services.AddSingleton(); services.AddSingleton(); - services.TryAddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); @@ -301,6 +304,8 @@ internal static void AddDefaultServices(ISiloBuilder builder) services.AddSingleton(); services.AddFromExisting(); services.AddFromExisting, ClusterManifestProvider>(); + services.AddSingleton(); + services.AddFromExisting, ClusterManifestSystemTarget>(); //Add default option formatter if none is configured, for options which are required to be configured services.ConfigureFormatter(); diff --git a/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs b/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs index ecdcd969d39..90b85d62043 100644 --- a/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs +++ b/src/Orleans.Runtime/Manifest/ClusterManifestProvider.cs @@ -203,8 +203,6 @@ private Task StartAsync(CancellationToken cancellationToken) private Task Initialize(CancellationToken cancellationToken) { _grainFactory = _services.GetRequiredService(); - var catalog = _services.GetRequiredService(); - catalog.RegisterSystemTarget(ActivatorUtilities.CreateInstance(_services)); return Task.CompletedTask; } diff --git a/src/Orleans.Runtime/MembershipService/MembershipSystemTarget.cs b/src/Orleans.Runtime/MembershipService/MembershipSystemTarget.cs index e3bdf840ee9..2c9bc6192c0 100644 --- a/src/Orleans.Runtime/MembershipService/MembershipSystemTarget.cs +++ b/src/Orleans.Runtime/MembershipService/MembershipSystemTarget.cs @@ -3,7 +3,6 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; -using Orleans.Internal; using Orleans.Runtime.Scheduler; namespace Orleans.Runtime.MembershipService @@ -16,17 +15,15 @@ internal sealed class MembershipSystemTarget : SystemTarget, IMembershipService, public MembershipSystemTarget( MembershipTableManager membershipTableManager, - ILocalSiloDetails localSiloDetails, - ILoggerFactory loggerFactory, ILogger log, IInternalGrainFactory grainFactory, - Catalog catalog) - : base(Constants.MembershipServiceType, localSiloDetails.SiloAddress, loggerFactory) + SystemTargetShared shared) + : base(Constants.MembershipServiceType, shared) { this.membershipTableManager = membershipTableManager; this.log = log; this.grainFactory = grainFactory; - catalog.RegisterSystemTarget(this); + shared.ActivationDirectory.RecordNewTarget(this); } public Task Ping(int pingNumber) => Task.CompletedTask; diff --git a/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataSystemTarget.cs b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataSystemTarget.cs index b4fbb1e0090..df02aaa7ffc 100644 --- a/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataSystemTarget.cs +++ b/src/Orleans.Runtime/MembershipService/SiloMetadata/SiloMetadataSystemTarget.cs @@ -8,27 +8,21 @@ #nullable enable namespace Orleans.Runtime.MembershipService.SiloMetadata; -internal sealed class SiloMetadataSystemTarget( - IOptions siloMetadata, - ILocalSiloDetails localSiloDetails, - ILoggerFactory loggerFactory, - IServiceProvider serviceProvider) - : SystemTarget(Constants.SiloMetadataType, localSiloDetails.SiloAddress, loggerFactory), ISiloMetadataSystemTarget, ILifecycleParticipant +internal sealed class SiloMetadataSystemTarget : SystemTarget, ISiloMetadataSystemTarget, ILifecycleParticipant { - private readonly SiloMetadata _siloMetadata = siloMetadata.Value; + private readonly SiloMetadata _siloMetadata; - public Task GetSiloMetadata() => Task.FromResult(_siloMetadata); + public SiloMetadataSystemTarget( + IOptions siloMetadata, + SystemTargetShared shared) : base(Constants.SiloMetadataType, shared) + { + _siloMetadata = siloMetadata.Value; + shared.ActivationDirectory.RecordNewTarget(this); + } + public Task GetSiloMetadata() => Task.FromResult(_siloMetadata); void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) { - lifecycle.Subscribe(nameof(SiloMetadataSystemTarget), ServiceLifecycleStage.RuntimeInitialize, OnRuntimeInitializeStart, OnRuntimeInitializeStop); - - Task OnRuntimeInitializeStart(CancellationToken token) - { - serviceProvider.GetRequiredService().RegisterSystemTarget(this); - return Task.CompletedTask; - } - - Task OnRuntimeInitializeStop(CancellationToken token) => Task.CompletedTask; + // We don't participate in any lifecycle stages: activating this instance is all that is necessary. } } \ No newline at end of file diff --git a/src/Orleans.Runtime/MembershipService/SystemTargetBasedMembershipTable.cs b/src/Orleans.Runtime/MembershipService/SystemTargetBasedMembershipTable.cs index d30ecc5a5e4..9bbf75f6cb3 100644 --- a/src/Orleans.Runtime/MembershipService/SystemTargetBasedMembershipTable.cs +++ b/src/Orleans.Runtime/MembershipService/SystemTargetBasedMembershipTable.cs @@ -37,13 +37,6 @@ private async Task GetMembershipTable() var siloDetails = this.serviceProvider.GetService(); bool isPrimarySilo = siloDetails.SiloAddress.Endpoint.Equals(options.PrimarySiloEndpoint); - if (isPrimarySilo) - { - LogInformationCreatingInMemoryMembershipTable(logger); - var catalog = serviceProvider.GetRequiredService(); - catalog.RegisterSystemTarget(ActivatorUtilities.CreateInstance(serviceProvider)); - } - var grainFactory = this.serviceProvider.GetRequiredService(); var result = grainFactory.GetSystemTarget(Constants.SystemMembershipTableType, SiloAddress.New(options.PrimarySiloEndpoint, 0)); if (isPrimarySilo) @@ -136,21 +129,20 @@ internal sealed partial class MembershipTableSystemTarget : SystemTarget, IMembe private readonly ILogger logger; public MembershipTableSystemTarget( - ILocalSiloDetails localSiloDetails, - ILoggerFactory loggerFactory, + ILogger logger, DeepCopier deepCopier, - Catalog catalog) - : base(CreateId(localSiloDetails), localSiloDetails.SiloAddress, loggerFactory) + SystemTargetShared shared) + : base(CreateId(shared.SiloAddress), shared) { - logger = loggerFactory.CreateLogger(); + this.logger = logger; table = new InMemoryMembershipTable(deepCopier); LogInformationGrainBasedMembershipTableActivated(logger); - catalog.RegisterSystemTarget(this); + shared.ActivationDirectory.RecordNewTarget(this); } - private static SystemTargetGrainId CreateId(ILocalSiloDetails localSiloDetails) + private static SystemTargetGrainId CreateId(SiloAddress siloAddress) { - return SystemTargetGrainId.Create(Constants.SystemMembershipTableType, SiloAddress.New(localSiloDetails.SiloAddress.Endpoint, 0)); + return SystemTargetGrainId.Create(Constants.SystemMembershipTableType, SiloAddress.New(siloAddress.Endpoint, 0)); } public Task InitializeMembershipTable(bool tryInitTableVersion) diff --git a/src/Orleans.Runtime/Placement/DeploymentLoadPublisher.cs b/src/Orleans.Runtime/Placement/DeploymentLoadPublisher.cs index 87c9c1a9ea5..26d7e928c0b 100644 --- a/src/Orleans.Runtime/Placement/DeploymentLoadPublisher.cs +++ b/src/Orleans.Runtime/Placement/DeploymentLoadPublisher.cs @@ -15,7 +15,7 @@ namespace Orleans.Runtime /// /// This class collects runtime statistics for all silos in the current deployment for use by placement. /// - internal partial class DeploymentLoadPublisher : SystemTarget, IDeploymentLoadPublisher, ISiloStatusListener, ILifecycleParticipant + internal sealed partial class DeploymentLoadPublisher : SystemTarget, IDeploymentLoadPublisher, ISiloStatusListener, ILifecycleParticipant { private readonly ILocalSiloDetails _siloDetails; private readonly ISiloStatusOracle _siloStatusOracle; @@ -46,8 +46,8 @@ public DeploymentLoadPublisher( IActivationWorkingSet activationWorkingSet, IEnvironmentStatisticsProvider environmentStatisticsProvider, IOptions loadSheddingOptions, - Catalog catalog) - : base(Constants.DeploymentLoadPublisherSystemTargetType, siloDetails.SiloAddress, loggerFactory) + SystemTargetShared shared) + : base(Constants.DeploymentLoadPublisherSystemTargetType, shared) { _logger = loggerFactory.CreateLogger(); _siloDetails = siloDetails; @@ -60,8 +60,8 @@ public DeploymentLoadPublisher( _statisticsRefreshTime = options.Value.DeploymentLoadPublisherRefreshTime; _periodicStats = new ConcurrentDictionary(); _siloStatisticsChangeListeners = new List(); - catalog.RegisterSystemTarget(this); siloStatusOracle.SubscribeToSiloStatusEvents(this); + shared.ActivationDirectory.RecordNewTarget(this); } private async Task StartAsync(CancellationToken cancellationToken) diff --git a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerMonitor.cs b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerMonitor.cs index 6b651005f0f..9c4df27e618 100644 --- a/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerMonitor.cs +++ b/src/Orleans.Runtime/Placement/Rebalancing/ActivationRebalancerMonitor.cs @@ -20,7 +20,6 @@ internal sealed partial class ActivationRebalancerMonitor : SystemTarget, IActiv private readonly TimeProvider _timeProvider; private readonly ActivationDirectory _activationDirectory; - private readonly ISiloStatusOracle _siloStatusOracle; private readonly IActivationRebalancerWorker _rebalancerGrain; private readonly ILogger _logger; private readonly List _statusListeners = []; @@ -29,24 +28,19 @@ internal sealed partial class ActivationRebalancerMonitor : SystemTarget, IActiv private readonly static TimeSpan TimerPeriod = 2 * IActivationRebalancerMonitor.WorkerReportPeriod; public ActivationRebalancerMonitor( - Catalog catalog, TimeProvider timeProvider, ActivationDirectory activationDirectory, ILoggerFactory loggerFactory, IGrainFactory grainFactory, - ILocalSiloDetails localSiloDetails, - ISiloStatusOracle siloStatusOracle) - : base(Constants.ActivationRebalancerMonitorType, localSiloDetails.SiloAddress, loggerFactory) + SystemTargetShared shared) + : base(Constants.ActivationRebalancerMonitorType, shared) { _timeProvider = timeProvider; _activationDirectory = activationDirectory; - _siloStatusOracle = siloStatusOracle; _logger = loggerFactory.CreateLogger(); _rebalancerGrain = grainFactory.GetGrain(0); _lastHeartbeatTimestamp = _timeProvider.GetTimestamp(); - catalog.RegisterSystemTarget(this); - _latestReport = new() { ClusterImbalance = 1, @@ -55,6 +49,7 @@ public ActivationRebalancerMonitor( SuspensionDuration = Timeout.InfiniteTimeSpan, Statistics = [] }; + shared.ActivationDirectory.RecordNewTarget(this); } public void Participate(ISiloLifecycle observer) diff --git a/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs b/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs index 92ee8db9fce..374eb34d557 100644 --- a/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs +++ b/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.MessageSink.cs @@ -1,7 +1,5 @@ #nullable enable using System; -using System.Diagnostics; -using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -10,7 +8,7 @@ namespace Orleans.Runtime.Placement.Repartitioning; -internal partial class ActivationRepartitioner : IMessageStatisticsSink +internal sealed partial class ActivationRepartitioner : IMessageStatisticsSink { private readonly CancellationTokenSource _shutdownCts = new(); diff --git a/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.cs b/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.cs index 43b311d085b..e24088f302e 100644 --- a/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.cs +++ b/src/Orleans.Runtime/Placement/Repartitioning/ActivationRepartitioner.cs @@ -43,17 +43,16 @@ internal sealed partial class ActivationRepartitioner : SystemTarget, IActivatio public ActivationRepartitioner( ISiloStatusOracle siloStatusOracle, - ILocalSiloDetails localSiloDetails, ILoggerFactory loggerFactory, IInternalGrainFactory internalGrainFactory, IRepartitionerMessageFilter messageFilter, IImbalanceToleranceRule toleranceRule, IActivationMigrationManager migrationManager, ActivationDirectory activationDirectory, - Catalog catalog, IOptions options, - TimeProvider timeProvider) - : base(Constants.ActivationRepartitionerType, localSiloDetails.SiloAddress, loggerFactory) + TimeProvider timeProvider, + SystemTargetShared shared) + : base(Constants.ActivationRepartitionerType, shared) { _logger = loggerFactory.CreateLogger(); _options = options.Value; @@ -71,7 +70,7 @@ public ActivationRepartitioner( null; _lastExchangedStopwatch = CoarseStopwatch.StartNew(); - catalog.RegisterSystemTarget(this); + shared.ActivationDirectory.RecordNewTarget(this); _siloStatusOracle.SubscribeToSiloStatusEvents(this); _timer = RegisterTimer(_ => TriggerExchangeRequest().AsTask(), null, Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); } diff --git a/src/Orleans.Runtime/Services/GrainService.cs b/src/Orleans.Runtime/Services/GrainService.cs index d8096161c58..df7b330e4d1 100644 --- a/src/Orleans.Runtime/Services/GrainService.cs +++ b/src/Orleans.Runtime/Services/GrainService.cs @@ -1,6 +1,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans.Runtime.ConsistentRing; using Orleans.Runtime.Scheduler; @@ -39,17 +40,18 @@ protected GrainServiceStatus Status /// Only to make Reflection happy. Do not use it in your implementation [System.ComponentModel.EditorBrowsable(System.ComponentModel.EditorBrowsableState.Never)] + [Obsolete("Do not call the empty constructor.")] protected GrainService() : base() { throw new Exception("This should not be constructed by client code."); } /// Constructor to use for grain services - internal GrainService(GrainId grainId, SiloAddress siloAddress, ILoggerFactory loggerFactory, IConsistentRingProvider ringProvider) - : base(SystemTargetGrainId.Create(grainId.Type, siloAddress), siloAddress, loggerFactory: loggerFactory) + internal GrainService(GrainId grainId, IConsistentRingProvider ringProvider, SystemTargetShared shared) + : base(SystemTargetGrainId.Create(grainId.Type, shared.SiloAddress), shared) { typeName = this.GetType().FullName; - Logger = loggerFactory.CreateLogger(typeName); + Logger = shared.LoggerFactory.CreateLogger(typeName); ring = ringProvider; StoppedCancellationTokenSource = new CancellationTokenSource(); @@ -57,7 +59,7 @@ internal GrainService(GrainId grainId, SiloAddress siloAddress, ILoggerFactory l /// Constructor to use for grain services protected GrainService(GrainId grainId, Silo silo, ILoggerFactory loggerFactory) - : this(grainId, silo.SiloAddress, loggerFactory, silo.RingProvider) + : this(grainId, silo.RingProvider, silo.Services.GetRequiredService()) { } diff --git a/src/Orleans.Runtime/Silo/Silo.cs b/src/Orleans.Runtime/Silo/Silo.cs index c5267c2f868..2dfb0bd5240 100644 --- a/src/Orleans.Runtime/Silo/Silo.cs +++ b/src/Orleans.Runtime/Silo/Silo.cs @@ -9,7 +9,6 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Orleans.Runtime.ConsistentRing; -using Orleans.Runtime.GrainDirectory; using Orleans.Runtime.Messaging; using Orleans.Runtime.Scheduler; using Orleans.Services; @@ -31,12 +30,9 @@ public sealed partial class Silo : IAsyncDisposable, IDisposable private readonly ILogger logger; private readonly TaskCompletionSource siloTerminatedTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); private readonly InsideRuntimeClient runtimeClient; - private readonly SystemTarget fallbackScheduler; - private readonly ISiloStatusOracle siloStatusOracle; private readonly Watchdog platformWatchdog; private readonly TimeSpan waitForMessageToBeQueuedForOutbound; private readonly TimeSpan initTimeout; - private readonly Catalog catalog; private readonly object lockable = new object(); private readonly GrainFactory grainFactory; private readonly ISiloLifecycleSubject siloLifecycle; @@ -73,7 +69,6 @@ public Silo(ILocalSiloDetails siloDetails, IServiceProvider services) Services = services; RingProvider = services.GetRequiredService(); platformWatchdog = services.GetRequiredService(); - fallbackScheduler = services.GetRequiredService(); this.siloDetails = siloDetails; IOptions clusterMembershipOptions = services.GetRequiredService>(); @@ -124,10 +119,6 @@ public Silo(ILocalSiloDetails siloDetails, IServiceProvider services) messageCenter = Services.GetRequiredService(); messageCenter.SniffIncomingMessage = runtimeClient.SniffIncomingMessage; - catalog = Services.GetRequiredService(); - - siloStatusOracle = Services.GetRequiredService(); - this.SystemStatus = SystemStatus.Created; this.siloLifecycle = this.Services.GetRequiredService(); @@ -153,7 +144,6 @@ public async Task StartAsync(CancellationToken cancellationToken) { // SystemTarget for provider init calls this.lifecycleSchedulingSystemTarget = Services.GetRequiredService(); - catalog.RegisterSystemTarget(lifecycleSchedulingSystemTarget); try { @@ -166,21 +156,6 @@ public async Task StartAsync(CancellationToken cancellationToken) } } - private void CreateSystemTargets() - { - var siloControl = ActivatorUtilities.CreateInstance(Services); - catalog.RegisterSystemTarget(siloControl); - - } - - private void InjectDependencies() - { - catalog.SiloStatusOracle = this.siloStatusOracle; - - // SystemTarget for provider init calls - catalog.RegisterSystemTarget(fallbackScheduler); - } - private Task OnRuntimeInitializeStart(CancellationToken ct) { lock (lockable) @@ -215,13 +190,6 @@ private async Task StartAsyncTaskWithPerfAnalysis(string taskName, Func ta private Task OnRuntimeServicesStart(CancellationToken ct) { - //TODO: Setup all (or as many as possible) of the class started in this call to work directly with lifecycle - var stopWatch = Stopwatch.StartNew(); - - // This has to follow the above steps that start the runtime components - CreateSystemTargets(); - InjectDependencies(); - return Task.CompletedTask; } @@ -273,7 +241,8 @@ private async Task CreateGrainServices() private async Task RegisterGrainService(IGrainService service) { var grainService = (GrainService)service; - catalog.RegisterSystemTarget(grainService); + var activationDirectory = this.Services.GetRequiredService(); + activationDirectory.RecordNewTarget(grainService); grainServices.Add(grainService); try @@ -422,6 +391,7 @@ private async Task OnBecomeActiveStop(CancellationToken ct) { try { + var catalog = this.Services.GetRequiredService(); await catalog.DeactivateAllActivations(ct); } catch (Exception exception) @@ -711,20 +681,12 @@ private readonly struct SiloAddressConsistentHashCodeLogValue(SiloAddress siloAd } // A dummy system target for fallback scheduler - internal class FallbackSystemTarget : SystemTarget - { - public FallbackSystemTarget(ILocalSiloDetails localSiloDetails, ILoggerFactory loggerFactory) - : base(Constants.FallbackSystemTargetType, localSiloDetails.SiloAddress, loggerFactory) - { - } - } - - // A dummy system target for fallback scheduler - internal class LifecycleSchedulingSystemTarget : SystemTarget + internal sealed class LifecycleSchedulingSystemTarget : SystemTarget { - public LifecycleSchedulingSystemTarget(ILocalSiloDetails localSiloDetails, ILoggerFactory loggerFactory) - : base(Constants.LifecycleSchedulingSystemTargetType, localSiloDetails.SiloAddress, loggerFactory) + public LifecycleSchedulingSystemTarget(SystemTargetShared shared) + : base(Constants.LifecycleSchedulingSystemTargetType, shared) { + shared.ActivationDirectory.RecordNewTarget(this); } } } \ No newline at end of file diff --git a/src/Orleans.Runtime/Silo/SiloControl.cs b/src/Orleans.Runtime/Silo/SiloControl.cs index cbb985d48a6..a96347daf6e 100644 --- a/src/Orleans.Runtime/Silo/SiloControl.cs +++ b/src/Orleans.Runtime/Silo/SiloControl.cs @@ -24,13 +24,12 @@ namespace Orleans.Runtime { - internal partial class SiloControl : SystemTarget, ISiloControl + internal sealed partial class SiloControl : SystemTarget, ISiloControl, ILifecycleParticipant { private readonly ILogger logger; private readonly ILocalSiloDetails localSiloDetails; private readonly DeploymentLoadPublisher deploymentLoadPublisher; - private readonly Catalog catalog; private readonly CachedVersionSelectorManager cachedVersionSelectorManager; private readonly CompatibilityDirectorManager compatibilityDirectorManager; private readonly VersionSelectorManager selectorManager; @@ -50,7 +49,6 @@ internal partial class SiloControl : SystemTarget, ISiloControl public SiloControl( ILocalSiloDetails localSiloDetails, DeploymentLoadPublisher deploymentLoadPublisher, - Catalog catalog, CachedVersionSelectorManager cachedVersionSelectorManager, CompatibilityDirectorManager compatibilityDirectorManager, VersionSelectorManager selectorManager, @@ -64,14 +62,14 @@ public SiloControl( IOptions loadSheddingOptions, GrainCountStatistics grainCountStatistics, GrainPropertiesResolver grainPropertiesResolver, - GrainMigratabilityChecker migratabilityChecker) - : base(Constants.SiloControlType, localSiloDetails.SiloAddress, loggerFactory) + GrainMigratabilityChecker migratabilityChecker, + SystemTargetShared shared) + : base(Constants.SiloControlType, shared) { this.localSiloDetails = localSiloDetails; this.logger = loggerFactory.CreateLogger(); this.deploymentLoadPublisher = deploymentLoadPublisher; - this.catalog = catalog; this.cachedVersionSelectorManager = cachedVersionSelectorManager; this.compatibilityDirectorManager = compatibilityDirectorManager; this.selectorManager = selectorManager; @@ -84,6 +82,7 @@ public SiloControl( _grainCountStatistics = grainCountStatistics; this.grainPropertiesResolver = grainPropertiesResolver; _migratabilityChecker = migratabilityChecker; + shared.ActivationDirectory.RecordNewTarget(this); } public Task Ping(string message) @@ -225,7 +224,7 @@ public async Task GetDetailedGrainReport(GrainId grainId) public Task GetActivationCount() { - return Task.FromResult(this.catalog.ActivationCount); + return Task.FromResult(this.activationDirectory.Count); } public Task SendControlCommandToProvider(string providerName, int command, object arg) where T : IControllable @@ -342,6 +341,11 @@ private List GetDetailedGrainStatisticsCore(string[]? ty return stats; } + void ILifecycleParticipant.Participate(ISiloLifecycle lifecycle) + { + // Do nothing, just ensure that this instance is created so that it can register itself in the activation directory. + } + [LoggerMessage( Level = LogLevel.Information, Message = "Ping" diff --git a/src/Orleans.Runtime/Silo/TestHooks/TestHooksSystemTarget.cs b/src/Orleans.Runtime/Silo/TestHooks/TestHooksSystemTarget.cs index 03bb7156015..58339b80d86 100644 --- a/src/Orleans.Runtime/Silo/TestHooks/TestHooksSystemTarget.cs +++ b/src/Orleans.Runtime/Silo/TestHooks/TestHooksSystemTarget.cs @@ -25,7 +25,7 @@ internal class TestHooksEnvironmentStatisticsProvider : IEnvironmentStatisticsPr /// /// Test hook functions for white box testing implemented as a SystemTarget /// - internal class TestHooksSystemTarget : SystemTarget, ITestHooksSystemTarget + internal sealed class TestHooksSystemTarget : SystemTarget, ITestHooksSystemTarget { private readonly IServiceProvider serviceProvider; private readonly ISiloStatusOracle siloStatusOracle; @@ -42,14 +42,21 @@ public TestHooksSystemTarget( ILoggerFactory loggerFactory, ISiloStatusOracle siloStatusOracle, TestHooksEnvironmentStatisticsProvider environmentStatistics, - IOptions loadSheddingOptions) - : base(Constants.TestHooksSystemTargetType, siloDetails.SiloAddress, loggerFactory) + IOptions loadSheddingOptions, + SystemTargetShared shared) + : base(Constants.TestHooksSystemTargetType, shared) { this.serviceProvider = serviceProvider; this.siloStatusOracle = siloStatusOracle; this.environmentStatistics = environmentStatistics; this.loadSheddingOptions = loadSheddingOptions.Value; this.consistentRingProvider = this.serviceProvider.GetRequiredService(); + shared.ActivationDirectory.RecordNewTarget(this); + } + + public void Initialize() + { + // No-op } public Task GetConsistentRingPrimaryTargetSilo(uint key) diff --git a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs index 46bc7a63323..db93df6a100 100644 --- a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs +++ b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs @@ -14,7 +14,7 @@ namespace Orleans.Streams { - internal partial class PersistentStreamPullingAgent : SystemTarget, IPersistentStreamPullingAgent + internal sealed partial class PersistentStreamPullingAgent : SystemTarget, IPersistentStreamPullingAgent { private const int ReadLoopRetryMax = 6; private const int StreamInactivityCheckFrequency = 10; @@ -44,18 +44,17 @@ internal partial class PersistentStreamPullingAgent : SystemTarget, IPersistentS internal PersistentStreamPullingAgent( SystemTargetGrainId id, string strProviderName, - ILoggerFactory loggerFactory, IStreamPubSub streamPubSub, IStreamFilter streamFilter, QueueId queueId, StreamPullingAgentOptions options, - SiloAddress siloAddress, IQueueAdapter queueAdapter, IQueueAdapterCache queueAdapterCache, IStreamFailureHandler streamFailureHandler, IBackoffProvider deliveryBackoffProvider, - IBackoffProvider queueReaderBackoffProvider) - : base(id, siloAddress, loggerFactory) + IBackoffProvider queueReaderBackoffProvider, + SystemTargetShared shared) + : base(id, shared) { if (strProviderName == null) throw new ArgumentNullException("runtime", "PersistentStreamPullingAgent: strProviderName should not be null"); @@ -72,8 +71,9 @@ internal PersistentStreamPullingAgent( this.queueReaderBackoffProvider = queueReaderBackoffProvider; numMessages = 0; - logger = loggerFactory.CreateLogger($"{this.GetType().Namespace}.{streamProviderName}"); + logger = shared.LoggerFactory.CreateLogger($"{this.GetType().Namespace}.{streamProviderName}"); LogInfoCreated(GetType().Name, GrainId, strProviderName, Silo, new(QueueId)); + shared.ActivationDirectory.RecordNewTarget(this); } /// diff --git a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingManager.cs b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingManager.cs index d5d3104cb56..5a3da16d3de 100644 --- a/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingManager.cs +++ b/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingManager.cs @@ -10,13 +10,12 @@ using Orleans.Internal; using System.Threading; using Orleans.Streams.Filtering; -using Microsoft.Extensions.DependencyInjection; using Orleans.Runtime.Scheduler; using System.Diagnostics.Metrics; namespace Orleans.Streams { - internal partial class PersistentStreamPullingManager : SystemTarget, IPersistentStreamPullingManager, IStreamQueueBalanceListener + internal sealed partial class PersistentStreamPullingManager : SystemTarget, IPersistentStreamPullingManager, IStreamQueueBalanceListener { private static readonly TimeSpan QUEUES_PRINT_PERIOD = TimeSpan.FromMinutes(5); @@ -24,11 +23,11 @@ internal partial class PersistentStreamPullingManager : SystemTarget, IPersisten private readonly Dictionary deactivatedAgents = new(); private readonly string streamProviderName; private readonly IStreamPubSub pubSub; + private readonly SystemTargetShared _systemTargetShared; private readonly StreamPullingAgentOptions options; private readonly AsyncSerialExecutor nonReentrancyGuarantor; // for non-reentrant execution of queue change notifications. private readonly ILogger logger; - private readonly ILoggerFactory loggerFactory; private int latestRingNotificationSequenceNumber; private int latestCommandNumber; private readonly IQueueAdapter queueAdapter; @@ -43,6 +42,7 @@ internal partial class PersistentStreamPullingManager : SystemTarget, IPersisten private int nextAgentId; private int NumberRunningAgents { get { return queuesToAgentsMap.Count; } } + internal PersistentStreamPullingManager( SystemTargetGrainId managerId, string strProviderName, @@ -51,12 +51,11 @@ internal PersistentStreamPullingManager( IStreamQueueBalancer streamQueueBalancer, IStreamFilter streamFilter, StreamPullingAgentOptions options, - ILoggerFactory loggerFactory, - SiloAddress siloAddress, IQueueAdapter queueAdapter, IBackoffProvider deliveryBackoffProvider, - IBackoffProvider queueReaderBackoffProvider) - : base(managerId, siloAddress, loggerFactory) + IBackoffProvider queueReaderBackoffProvider, + SystemTargetShared shared) + : base(managerId, shared) { if (string.IsNullOrWhiteSpace(strProviderName)) { @@ -86,11 +85,12 @@ internal PersistentStreamPullingManager( this.queueAdapter = queueAdapter ?? throw new ArgumentNullException(nameof(queueAdapter)); _deliveryBackoffProvider = deliveryBackoffProvider; _queueReaderBackoffProvider = queueReaderBackoffProvider; + _systemTargetShared = shared; queueAdapterCache = adapterFactory.GetQueueAdapterCache(); - logger = loggerFactory.CreateLogger($"{GetType().FullName}.{streamProviderName}"); + logger = shared.LoggerFactory.CreateLogger($"{GetType().FullName}.{streamProviderName}"); LogInfoCreated(GetType().Name, streamProviderName); - this.loggerFactory = loggerFactory; StreamInstruments.RegisterPersistentStreamPullingAgentsObserve(() => new Measurement(queuesToAgentsMap.Count, new KeyValuePair("name", streamProviderName))); + shared.ActivationDirectory.RecordNewTarget(this); } public async Task Initialize() @@ -238,8 +238,19 @@ private async Task AddNewQueues(IEnumerable myQueues, bool failOnInit) var agentIdNumber = Interlocked.Increment(ref nextAgentId); var agentId = SystemTargetGrainId.Create(Constants.StreamPullingAgentType, this.Silo, $"{streamProviderName}_{agentIdNumber}_{queueId:H}"); IStreamFailureHandler deliveryFailureHandler = await adapterFactory.GetDeliveryFailureHandler(queueId); - agent = new PersistentStreamPullingAgent(agentId, streamProviderName, this.loggerFactory, pubSub, streamFilter, queueId, this.options, this.Silo, queueAdapter, queueAdapterCache, deliveryFailureHandler, _deliveryBackoffProvider, _queueReaderBackoffProvider); - this.ActivationServices.GetRequiredService().RegisterSystemTarget(agent); + agent = new PersistentStreamPullingAgent( + agentId, + streamProviderName, + pubSub, + streamFilter, + queueId, + this.options, + queueAdapter, + queueAdapterCache, + deliveryFailureHandler, + _deliveryBackoffProvider, + _queueReaderBackoffProvider, + _systemTargetShared); queuesToAgentsMap.Add(queueId, agent); agents.Add(agent); } diff --git a/src/Orleans.Streaming/Providers/SiloStreamProviderRuntime.cs b/src/Orleans.Streaming/Providers/SiloStreamProviderRuntime.cs index 000227d9615..608ac8edbce 100644 --- a/src/Orleans.Streaming/Providers/SiloStreamProviderRuntime.cs +++ b/src/Orleans.Streaming/Providers/SiloStreamProviderRuntime.cs @@ -80,14 +80,10 @@ public async Task InitializePullingAgents( queueBalancer, filter, pullingAgentOptions, - this.loggerFactory, - this.siloDetails.SiloAddress, queueAdapter, deliveryProvider, - queueReaderProvider); - - var catalog = this.ServiceProvider.GetRequiredService(); - catalog.RegisterSystemTarget(manager); + queueReaderProvider, + ServiceProvider.GetRequiredService()); // Init the manager only after it was registered locally. var pullingAgentManager = manager.AsReference(); diff --git a/src/Orleans.TestingHost/InProcTestCluster.cs b/src/Orleans.TestingHost/InProcTestCluster.cs index 69b05dc1c18..134fa06dad0 100644 --- a/src/Orleans.TestingHost/InProcTestCluster.cs +++ b/src/Orleans.TestingHost/InProcTestCluster.cs @@ -815,8 +815,6 @@ private static void TryConfigureFileLogging(InProcessTestClusterOptions options, private static void InitializeTestHooksSystemTarget(IHost host) { - var testHook = host.Services.GetRequiredService(); - var catalog = host.Services.GetRequiredService(); - catalog.RegisterSystemTarget(testHook); + _ = host.Services.GetRequiredService(); } } diff --git a/src/Orleans.TestingHost/TestClusterHostFactory.cs b/src/Orleans.TestingHost/TestClusterHostFactory.cs index 46a3d1c6e55..c175ef77c72 100644 --- a/src/Orleans.TestingHost/TestClusterHostFactory.cs +++ b/src/Orleans.TestingHost/TestClusterHostFactory.cs @@ -190,9 +190,7 @@ private static void TryConfigureFileLogging(IConfiguration configuration, IServi private static void InitializeTestHooksSystemTarget(IHost host) { - var testHook = host.Services.GetRequiredService(); - var catalog = host.Services.GetRequiredService(); - catalog.RegisterSystemTarget(testHook); + _ = host.Services.GetRequiredService(); } } } diff --git a/test/NonSilo.Tests/Directory/ClientDirectoryTests.cs b/test/NonSilo.Tests/Directory/ClientDirectoryTests.cs index 59979ef26d6..e24b1feb773 100644 --- a/test/NonSilo.Tests/Directory/ClientDirectoryTests.cs +++ b/test/NonSilo.Tests/Directory/ClientDirectoryTests.cs @@ -1,6 +1,7 @@ using System.Collections.Concurrent; using System.Collections.Immutable; using System.Threading.Channels; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; @@ -11,6 +12,7 @@ using Orleans.Runtime; using Orleans.Runtime.GrainDirectory; using Orleans.Runtime.Messaging; +using Orleans.Runtime.Scheduler; using UnitTests.Directory; using Xunit; @@ -71,6 +73,14 @@ public ClientDirectoryTests() _grainFactory = Substitute.For(); _grainFactory.GetSystemTarget(default, default) .ReturnsForAnyArgs(info => _remoteDirectories.GetOrAdd(info.ArgAt(1), k => Substitute.For())); + var systemTargetShared = new SystemTargetShared( + runtimeClient: null!, + localSiloDetails: _localSiloDetails, + loggerFactory: _loggerFactory, + schedulingOptions: Options.Create(new SchedulingOptions()), + grainReferenceActivator: null, + timerRegistry: null, + activations: new ActivationDirectory()); _directory = new ClientDirectory( grainFactory: _grainFactory, @@ -80,7 +90,7 @@ public ClientDirectoryTests() clusterMembershipService: _clusterMembershipService, timerFactory: _timerFactory, connectedClients: _connectedClientCollection, - catalog: null); + shared: systemTargetShared); _testAccessor = new ClientDirectory.TestAccessor(_directory); // Disable automatic publishing to simplify testing. diff --git a/test/Tester/GrainServiceTests/TestGrainService.cs b/test/Tester/GrainServiceTests/TestGrainService.cs index 516bac34c7b..862a59e80ce 100644 --- a/test/Tester/GrainServiceTests/TestGrainService.cs +++ b/test/Tester/GrainServiceTests/TestGrainService.cs @@ -44,7 +44,7 @@ public Task EchoViaExtension(string what) } } - public class TestGrainService : GrainService, ITestGrainService + public sealed class TestGrainService : GrainService, ITestGrainService { private readonly TestGrainServiceOptions config;