Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/Orleans.Core/Runtime/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ internal static class Constants
public static readonly GrainType CatalogType = SystemTargetGrainId.CreateGrainType("catalog");
public static readonly GrainType MembershipServiceType = SystemTargetGrainId.CreateGrainType("clustering");
public static readonly GrainType SystemMembershipTableType = SystemTargetGrainId.CreateGrainType("clustering.dev");
public static readonly GrainType FallbackSystemTargetType = SystemTargetGrainId.CreateGrainType("fallback");
public static readonly GrainType LifecycleSchedulingSystemTargetType = SystemTargetGrainId.CreateGrainType("lifecycle");
public static readonly GrainType DeploymentLoadPublisherSystemTargetType = SystemTargetGrainId.CreateGrainType("load-publisher");
public static readonly GrainType TestHooksSystemTargetType = SystemTargetGrainId.CreateGrainType("test.hooks");
Expand Down Expand Up @@ -47,7 +46,6 @@ internal static class Constants
{ClientDirectoryType, "ClientDirectory"},
{CatalogType,"Catalog"},
{MembershipServiceType,"MembershipService"},
{FallbackSystemTargetType, "FallbackSystemTarget"},
{LifecycleSchedulingSystemTargetType, "LifecycleSchedulingSystemTarget"},
{DeploymentLoadPublisherSystemTargetType, "DeploymentLoadPublisherSystemTarget"},
{StreamProviderManagerAgentSystemTargetType,"StreamProviderManagerAgent"},
Expand Down
15 changes: 6 additions & 9 deletions src/Orleans.Reminders/ReminderService/LocalReminderService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,15 @@ internal sealed class LocalReminderService : GrainService, IReminderService, ILi
public LocalReminderService(
GrainReferenceActivator referenceActivator,
GrainInterfaceTypeResolver interfaceTypeResolver,
ILocalSiloDetails localSiloDetails,
IReminderTable reminderTable,
ILoggerFactory loggerFactory,
IAsyncTimerFactory asyncTimerFactory,
IOptions<ReminderOptions> reminderOptions,
IConsistentRingProvider ringProvider,
Catalog catalog)
SystemTargetShared shared)
: base(
SystemTargetGrainId.CreateGrainServiceGrainId(GrainInterfaceUtils.GetGrainClassTypeCode(typeof(IReminderService)), null, localSiloDetails.SiloAddress),
localSiloDetails.SiloAddress,
loggerFactory,
ringProvider)
SystemTargetGrainId.CreateGrainServiceGrainId(GrainInterfaceUtils.GetGrainClassTypeCode(typeof(IReminderService)), null, shared.SiloAddress),
ringProvider,
shared)
{
_referenceActivator = referenceActivator;
_grainInterfaceType = interfaceTypeResolver.GetGrainInterfaceType(typeof(IRemindable));
Expand All @@ -57,9 +54,9 @@ public LocalReminderService(
this.asyncTimerFactory = asyncTimerFactory;
ReminderInstruments.RegisterActiveRemindersObserve(() => localReminders.Count);
startedTask = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
this.logger = loggerFactory.CreateLogger<LocalReminderService>();
this.logger = shared.LoggerFactory.CreateLogger<LocalReminderService>();
this.listRefreshTimer = asyncTimerFactory.Create(this.reminderOptions.RefreshReminderListPeriod, "ReminderService.ReminderListRefresher");
catalog.RegisterSystemTarget(this);
shared.ActivationDirectory.RecordNewTarget(this);
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle observer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@ public GrainCallCancellationManager(
IServiceProvider serviceProvider,
Catalog catalog,
ActivationDirectory activationDirectory,
IClusterMembershipService clusterMembershipService) : base(Constants.CancellationManagerType, localSiloDetails.SiloAddress, loggerFactory)
IClusterMembershipService clusterMembershipService,
SystemTargetShared shared) : base(Constants.CancellationManagerType, shared)
{
_serviceProvider = serviceProvider;
_logger = loggerFactory.CreateLogger<GrainCallCancellationManager>();
_catalog = catalog;
_activationDirectory = activationDirectory;
_clusterMembershipService = clusterMembershipService;
_catalog.RegisterSystemTarget(this);

using (new ExecutionContextSuppressor())
{
using var _ = new ExecutionContextSuppressor();
_membershipUpdatesTask = Task.Factory.StartNew(
state => ((GrainCallCancellationManager)state!).ProcessMembershipUpdates(),
this,
Expand All @@ -80,6 +80,8 @@ public GrainCallCancellationManager(
WorkItemGroup.TaskScheduler).Unwrap();
_membershipUpdatesTask.Ignore();
}

shared.ActivationDirectory.RecordNewTarget(this);
}

private IInternalGrainFactory GrainFactory => _grainFactory ??= _serviceProvider.GetRequiredService<IInternalGrainFactory>();
Expand Down
7 changes: 4 additions & 3 deletions src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal interface IActivationMigrationManager
/// <summary>
/// Migrates grain activations to target hosts and handles migration requests from other hosts.
/// </summary>
internal partial class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager, ILifecycleParticipant<ISiloLifecycle>
internal sealed partial class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager, ILifecycleParticipant<ISiloLifecycle>
{
private const int MaxBatchSize = 1_000;
private readonly ConcurrentDictionary<SiloAddress, (Task PumpTask, Channel<MigrationWorkItem> WorkItemChannel)> _workers = new();
Expand All @@ -74,13 +74,14 @@ public ActivationMigrationManager(
ILoggerFactory loggerFactory,
IInternalGrainFactory grainFactory,
Catalog catalog,
IClusterMembershipService clusterMembershipService) : base(Constants.ActivationMigratorType, localSiloDetails.SiloAddress, loggerFactory)
SystemTargetShared shared,
IClusterMembershipService clusterMembershipService) : base(Constants.ActivationMigratorType, shared)
{
_grainFactory = grainFactory;
_logger = loggerFactory.CreateLogger<ActivationMigrationManager>();
_catalog = catalog;
_clusterMembershipService = clusterMembershipService;
_catalog.RegisterSystemTarget(this);
shared.ActivationDirectory.RecordNewTarget(this);

{
using var _ = new ExecutionContextSuppressor();
Expand Down
62 changes: 17 additions & 45 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.GrainDirectory;
using Orleans.Runtime.GrainDirectory;
using Orleans.Runtime.Scheduler;

namespace Orleans.Runtime
{
internal sealed partial class Catalog : SystemTarget, ICatalog
internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant<ISiloLifecycle>
{
public SiloAddress LocalSilo { get; private set; }
internal ISiloStatusOracle SiloStatusOracle { get; set; }
private readonly SiloAddress _siloAddress;
private readonly ActivationCollector activationCollector;
private readonly GrainDirectoryResolver grainDirectoryResolver;
private readonly ActivationDirectory activations;
private readonly IServiceProvider serviceProvider;
private readonly ILogger logger;
private readonly GrainContextActivator grainActivator;
private ISiloStatusOracle _siloStatusOracle;

public Catalog(
ILocalSiloDetails localSiloDetails,
Expand All @@ -31,17 +28,17 @@ public Catalog(
ActivationCollector activationCollector,
IServiceProvider serviceProvider,
ILoggerFactory loggerFactory,
GrainContextActivator grainActivator)
: base(Constants.CatalogType, localSiloDetails.SiloAddress, loggerFactory)
GrainContextActivator grainActivator,
SystemTargetShared shared)
: base(Constants.CatalogType, shared)
{
this.LocalSilo = localSiloDetails.SiloAddress;
this._siloAddress = localSiloDetails.SiloAddress;
this.grainDirectoryResolver = grainDirectoryResolver;
this.activations = activationDirectory;
this.serviceProvider = serviceProvider;
this.grainActivator = grainActivator;
this.logger = loggerFactory.CreateLogger<Catalog>();
this.activationCollector = activationCollector;
this.RuntimeClient = serviceProvider.GetRequiredService<InsideRuntimeClient>();

GC.GetTotalMemory(true); // need to call once w/true to ensure false returns OK value

Expand All @@ -58,7 +55,7 @@ public Catalog(

return counter;
});
RegisterSystemTarget(this);
shared.ActivationDirectory.RecordNewTarget(this);
}

/// <summary>
Expand Down Expand Up @@ -94,29 +91,6 @@ internal int UnregisterGrainForTesting(GrainId grain)
return 1;
}

public void RegisterSystemTarget(ISystemTarget target)
{
var systemTarget = target as SystemTarget;
if (systemTarget == null) throw new ArgumentException($"Parameter must be of type {typeof(SystemTarget)}", nameof(target));
systemTarget.RuntimeClient = this.RuntimeClient;
var sp = this.serviceProvider;
systemTarget.WorkItemGroup = new WorkItemGroup(
systemTarget,
sp.GetRequiredService<ILogger<WorkItemGroup>>(),
sp.GetRequiredService<ILogger<ActivationTaskScheduler>>(),
sp.GetRequiredService<IOptions<SchedulingOptions>>());
activations.RecordNewTarget(systemTarget);
}

public void UnregisterSystemTarget(ISystemTarget target)
{
var systemTarget = target as SystemTarget;
if (systemTarget == null) throw new ArgumentException($"Parameter must be of type {typeof(SystemTarget)}", nameof(target));
activations.RemoveTarget(systemTarget);
}

public int ActivationCount { get { return activations.Count; } }

/// <summary>
/// If activation already exists, return it.
/// Otherwise, creates a new activation, begins rehydrating it and activating it, then returns it.
Expand Down Expand Up @@ -154,7 +128,7 @@ public IGrainContext GetOrCreateActivation(
return result;
}

if (!SiloStatusOracle.CurrentStatus.IsTerminating())
if (!_siloStatusOracle.CurrentStatus.IsTerminating())
{
var address = new GrainAddress
{
Expand Down Expand Up @@ -191,7 +165,7 @@ public IGrainContext GetOrCreateActivation(
static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId)
{
// Did not find and did not start placing new
var isTerminating = self.SiloStatusOracle.CurrentStatus.IsTerminating();
var isTerminating = self._siloStatusOracle.CurrentStatus.IsTerminating();
if (isTerminating)
{
self.LogDebugUnableToCreateActivationTerminating(grainId);
Expand Down Expand Up @@ -292,14 +266,6 @@ await Parallel.ForEachAsync(activations, options, (kv, _) =>
}).WaitAsync(cancellationToken);
}

public SiloStatus LocalSiloStatus
{
get
{
return SiloStatusOracle.CurrentStatus;
}
}

public async Task DeleteActivations(List<GrainAddress> addresses, DeactivationReasonCode reasonCode, string reasonText)
{
var tasks = new List<Task>(addresses.Count);
Expand All @@ -320,7 +286,7 @@ await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) =>
internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status)
{
// ignore joining events and also events on myself.
if (updatedSilo.Equals(LocalSilo)) return;
if (updatedSilo.Equals(_siloAddress)) return;

// We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states,
// since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses,
Expand Down Expand Up @@ -386,6 +352,12 @@ void StartDeactivatingActivations(DeactivationReason reason, List<IGrainContext>
}
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
// Do nothing, just ensure that this instance is created so that it can register itself in the activation directory.
_siloStatusOracle = serviceProvider.GetRequiredService<ISiloStatusOracle>();
}

private readonly struct SiloAddressLogValue(SiloAddress silo)
{
public override string ToString() => silo.ToStringWithHashCode();
Expand Down
42 changes: 42 additions & 0 deletions src/Orleans.Runtime/Catalog/SystemTargetShared.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.GrainReferences;
using Orleans.Runtime.Scheduler;
using Orleans.Timers;

namespace Orleans.Runtime
{
internal sealed class SystemTargetShared(
InsideRuntimeClient runtimeClient,
ILocalSiloDetails localSiloDetails,
ILoggerFactory loggerFactory,
ILogger<WorkItemGroup> workItemGroupLogger,
ILogger<ActivationTaskScheduler> activationTaskSchedulerLogger,
IOptions<SchedulingOptions> schedulingOptions,
GrainReferenceActivator grainReferenceActivator,
RuntimeMessagingTrace messagingTrace,
ITimerRegistry timerRegistry,
ActivationDirectory activations)
{
public SiloAddress SiloAddress => localSiloDetails.SiloAddress;

public ILoggerFactory LoggerFactory => loggerFactory;
public GrainReferenceActivator GrainReferenceActivator => grainReferenceActivator;
public ITimerRegistry TimerRegistry => timerRegistry;

public RuntimeMessagingTrace MessagingTrace => messagingTrace;
public InsideRuntimeClient RuntimeClient => runtimeClient;
public ActivationDirectory ActivationDirectory => activations;
public WorkItemGroup CreateWorkItemGroup(SystemTarget systemTarget)
{
ArgumentNullException.ThrowIfNull(systemTarget);
return new WorkItemGroup(
systemTarget,
workItemGroupLogger,
activationTaskSchedulerLogger,
schedulingOptions);
}
}
}
Loading
Loading