Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/Orleans.Core/Runtime/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ internal static class Constants
public static readonly GrainType CatalogType = SystemTargetGrainId.CreateGrainType("catalog");
public static readonly GrainType MembershipServiceType = SystemTargetGrainId.CreateGrainType("clustering");
public static readonly GrainType SystemMembershipTableType = SystemTargetGrainId.CreateGrainType("clustering.dev");
public static readonly GrainType FallbackSystemTargetType = SystemTargetGrainId.CreateGrainType("fallback");
public static readonly GrainType LifecycleSchedulingSystemTargetType = SystemTargetGrainId.CreateGrainType("lifecycle");
public static readonly GrainType DeploymentLoadPublisherSystemTargetType = SystemTargetGrainId.CreateGrainType("load-publisher");
public static readonly GrainType TestHooksSystemTargetType = SystemTargetGrainId.CreateGrainType("test.hooks");
Expand Down Expand Up @@ -47,7 +46,6 @@ internal static class Constants
{ClientDirectoryType, "ClientDirectory"},
{CatalogType,"Catalog"},
{MembershipServiceType,"MembershipService"},
{FallbackSystemTargetType, "FallbackSystemTarget"},
{LifecycleSchedulingSystemTargetType, "LifecycleSchedulingSystemTarget"},
{DeploymentLoadPublisherSystemTargetType, "DeploymentLoadPublisherSystemTarget"},
{StreamProviderManagerAgentSystemTargetType,"StreamProviderManagerAgent"},
Expand Down
15 changes: 6 additions & 9 deletions src/Orleans.Reminders/ReminderService/LocalReminderService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,15 @@ internal sealed class LocalReminderService : GrainService, IReminderService, ILi
public LocalReminderService(
GrainReferenceActivator referenceActivator,
GrainInterfaceTypeResolver interfaceTypeResolver,
ILocalSiloDetails localSiloDetails,
IReminderTable reminderTable,
ILoggerFactory loggerFactory,
IAsyncTimerFactory asyncTimerFactory,
IOptions<ReminderOptions> reminderOptions,
IConsistentRingProvider ringProvider,
Catalog catalog)
SystemTargetShared shared)
: base(
SystemTargetGrainId.CreateGrainServiceGrainId(GrainInterfaceUtils.GetGrainClassTypeCode(typeof(IReminderService)), null, localSiloDetails.SiloAddress),
localSiloDetails.SiloAddress,
loggerFactory,
ringProvider)
SystemTargetGrainId.CreateGrainServiceGrainId(GrainInterfaceUtils.GetGrainClassTypeCode(typeof(IReminderService)), null, shared.SiloAddress),
ringProvider,
shared)
{
_referenceActivator = referenceActivator;
_grainInterfaceType = interfaceTypeResolver.GetGrainInterfaceType(typeof(IRemindable));
Expand All @@ -57,9 +54,9 @@ public LocalReminderService(
this.asyncTimerFactory = asyncTimerFactory;
ReminderInstruments.RegisterActiveRemindersObserve(() => localReminders.Count);
startedTask = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
this.logger = loggerFactory.CreateLogger<LocalReminderService>();
this.logger = shared.LoggerFactory.CreateLogger<LocalReminderService>();
this.listRefreshTimer = asyncTimerFactory.Create(this.reminderOptions.RefreshReminderListPeriod, "ReminderService.ReminderListRefresher");
catalog.RegisterSystemTarget(this);
shared.ActivationDirectory.RecordNewTarget(this);
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle observer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@ public GrainCallCancellationManager(
IServiceProvider serviceProvider,
Catalog catalog,
ActivationDirectory activationDirectory,
IClusterMembershipService clusterMembershipService) : base(Constants.CancellationManagerType, localSiloDetails.SiloAddress, loggerFactory)
IClusterMembershipService clusterMembershipService,
SystemTargetShared shared) : base(Constants.CancellationManagerType, shared)
{
_serviceProvider = serviceProvider;
_logger = loggerFactory.CreateLogger<GrainCallCancellationManager>();
_catalog = catalog;
_activationDirectory = activationDirectory;
_clusterMembershipService = clusterMembershipService;
_catalog.RegisterSystemTarget(this);

using (new ExecutionContextSuppressor())
{
using var _ = new ExecutionContextSuppressor();
_membershipUpdatesTask = Task.Factory.StartNew(
state => ((GrainCallCancellationManager)state!).ProcessMembershipUpdates(),
this,
Expand All @@ -80,6 +80,8 @@ public GrainCallCancellationManager(
WorkItemGroup.TaskScheduler).Unwrap();
_membershipUpdatesTask.Ignore();
}

shared.ActivationDirectory.RecordNewTarget(this);
}

private IInternalGrainFactory GrainFactory => _grainFactory ??= _serviceProvider.GetRequiredService<IInternalGrainFactory>();
Expand Down
7 changes: 4 additions & 3 deletions src/Orleans.Runtime/Catalog/ActivationMigrationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ internal interface IActivationMigrationManager
/// <summary>
/// Migrates grain activations to target hosts and handles migration requests from other hosts.
/// </summary>
internal partial class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager, ILifecycleParticipant<ISiloLifecycle>
internal sealed partial class ActivationMigrationManager : SystemTarget, IActivationMigrationManagerSystemTarget, IActivationMigrationManager, ILifecycleParticipant<ISiloLifecycle>
{
private const int MaxBatchSize = 1_000;
private readonly ConcurrentDictionary<SiloAddress, (Task PumpTask, Channel<MigrationWorkItem> WorkItemChannel)> _workers = new();
Expand All @@ -74,13 +74,14 @@ public ActivationMigrationManager(
ILoggerFactory loggerFactory,
IInternalGrainFactory grainFactory,
Catalog catalog,
IClusterMembershipService clusterMembershipService) : base(Constants.ActivationMigratorType, localSiloDetails.SiloAddress, loggerFactory)
SystemTargetShared shared,
IClusterMembershipService clusterMembershipService) : base(Constants.ActivationMigratorType, shared)
{
_grainFactory = grainFactory;
_logger = loggerFactory.CreateLogger<ActivationMigrationManager>();
_catalog = catalog;
_clusterMembershipService = clusterMembershipService;
_catalog.RegisterSystemTarget(this);
shared.ActivationDirectory.RecordNewTarget(this);

{
using var _ = new ExecutionContextSuppressor();
Expand Down
62 changes: 17 additions & 45 deletions src/Orleans.Runtime/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,21 @@
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.GrainDirectory;
using Orleans.Runtime.GrainDirectory;
using Orleans.Runtime.Scheduler;

namespace Orleans.Runtime
{
internal sealed partial class Catalog : SystemTarget, ICatalog
internal sealed partial class Catalog : SystemTarget, ICatalog, ILifecycleParticipant<ISiloLifecycle>
{
public SiloAddress LocalSilo { get; private set; }
internal ISiloStatusOracle SiloStatusOracle { get; set; }
private readonly SiloAddress _siloAddress;
private readonly ActivationCollector activationCollector;
private readonly GrainDirectoryResolver grainDirectoryResolver;
private readonly ActivationDirectory activations;
private readonly IServiceProvider serviceProvider;
private readonly ILogger logger;
private readonly GrainContextActivator grainActivator;
private ISiloStatusOracle _siloStatusOracle;

public Catalog(
ILocalSiloDetails localSiloDetails,
Expand All @@ -31,17 +28,17 @@ public Catalog(
ActivationCollector activationCollector,
IServiceProvider serviceProvider,
ILoggerFactory loggerFactory,
GrainContextActivator grainActivator)
: base(Constants.CatalogType, localSiloDetails.SiloAddress, loggerFactory)
GrainContextActivator grainActivator,
SystemTargetShared shared)
: base(Constants.CatalogType, shared)
{
this.LocalSilo = localSiloDetails.SiloAddress;
this._siloAddress = localSiloDetails.SiloAddress;
this.grainDirectoryResolver = grainDirectoryResolver;
this.activations = activationDirectory;
this.serviceProvider = serviceProvider;
this.grainActivator = grainActivator;
this.logger = loggerFactory.CreateLogger<Catalog>();
this.activationCollector = activationCollector;
this.RuntimeClient = serviceProvider.GetRequiredService<InsideRuntimeClient>();

GC.GetTotalMemory(true); // need to call once w/true to ensure false returns OK value

Expand All @@ -58,7 +55,7 @@ public Catalog(

return counter;
});
RegisterSystemTarget(this);
shared.ActivationDirectory.RecordNewTarget(this);
}

/// <summary>
Expand Down Expand Up @@ -94,29 +91,6 @@ internal int UnregisterGrainForTesting(GrainId grain)
return 1;
}

public void RegisterSystemTarget(ISystemTarget target)
{
var systemTarget = target as SystemTarget;
if (systemTarget == null) throw new ArgumentException($"Parameter must be of type {typeof(SystemTarget)}", nameof(target));
systemTarget.RuntimeClient = this.RuntimeClient;
var sp = this.serviceProvider;
systemTarget.WorkItemGroup = new WorkItemGroup(
systemTarget,
sp.GetRequiredService<ILogger<WorkItemGroup>>(),
sp.GetRequiredService<ILogger<ActivationTaskScheduler>>(),
sp.GetRequiredService<IOptions<SchedulingOptions>>());
activations.RecordNewTarget(systemTarget);
}

public void UnregisterSystemTarget(ISystemTarget target)
{
var systemTarget = target as SystemTarget;
if (systemTarget == null) throw new ArgumentException($"Parameter must be of type {typeof(SystemTarget)}", nameof(target));
activations.RemoveTarget(systemTarget);
}

public int ActivationCount { get { return activations.Count; } }

/// <summary>
/// If activation already exists, return it.
/// Otherwise, creates a new activation, begins rehydrating it and activating it, then returns it.
Expand Down Expand Up @@ -154,7 +128,7 @@ public IGrainContext GetOrCreateActivation(
return result;
}

if (!SiloStatusOracle.CurrentStatus.IsTerminating())
if (!_siloStatusOracle.CurrentStatus.IsTerminating())
{
var address = new GrainAddress
{
Expand Down Expand Up @@ -191,7 +165,7 @@ public IGrainContext GetOrCreateActivation(
static IGrainContext UnableToCreateActivation(Catalog self, GrainId grainId)
{
// Did not find and did not start placing new
var isTerminating = self.SiloStatusOracle.CurrentStatus.IsTerminating();
var isTerminating = self._siloStatusOracle.CurrentStatus.IsTerminating();
if (isTerminating)
{
self.LogDebugUnableToCreateActivationTerminating(grainId);
Expand Down Expand Up @@ -292,14 +266,6 @@ await Parallel.ForEachAsync(activations, options, (kv, _) =>
}).WaitAsync(cancellationToken);
}

public SiloStatus LocalSiloStatus
{
get
{
return SiloStatusOracle.CurrentStatus;
}
}

public async Task DeleteActivations(List<GrainAddress> addresses, DeactivationReasonCode reasonCode, string reasonText)
{
var tasks = new List<Task>(addresses.Count);
Expand All @@ -320,7 +286,7 @@ await Parallel.ForEachAsync(addresses, (activationAddress, cancellationToken) =>
internal void OnSiloStatusChange(ILocalGrainDirectory directory, SiloAddress updatedSilo, SiloStatus status)
{
// ignore joining events and also events on myself.
if (updatedSilo.Equals(LocalSilo)) return;
if (updatedSilo.Equals(_siloAddress)) return;

// We deactivate those activations when silo goes either of ShuttingDown/Stopping/Dead states,
// since this is what Directory is doing as well. Directory removes a silo based on all those 3 statuses,
Expand Down Expand Up @@ -386,6 +352,12 @@ void StartDeactivatingActivations(DeactivationReason reason, List<IGrainContext>
}
}

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
// Do nothing, just ensure that this instance is created so that it can register itself in the activation directory.
_siloStatusOracle = serviceProvider.GetRequiredService<ISiloStatusOracle>();
}

private readonly struct SiloAddressLogValue(SiloAddress silo)
{
public override string ToString() => silo.ToStringWithHashCode();
Expand Down
41 changes: 41 additions & 0 deletions src/Orleans.Runtime/Catalog/SystemTargetShared.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#nullable enable
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Orleans.Configuration;
using Orleans.GrainReferences;
using Orleans.Runtime.Scheduler;
using Orleans.Timers;

namespace Orleans.Runtime;

internal sealed class SystemTargetShared(
InsideRuntimeClient runtimeClient,
ILocalSiloDetails localSiloDetails,
ILoggerFactory loggerFactory,
IOptions<SchedulingOptions> schedulingOptions,
GrainReferenceActivator grainReferenceActivator,
ITimerRegistry timerRegistry,
ActivationDirectory activations)
{
private readonly ILogger<WorkItemGroup> _workItemGroupLogger = loggerFactory.CreateLogger<WorkItemGroup>();
private readonly ILogger<ActivationTaskScheduler> _activationTaskSchedulerLogger = loggerFactory.CreateLogger<ActivationTaskScheduler>();
public SiloAddress SiloAddress => localSiloDetails.SiloAddress;

public ILoggerFactory LoggerFactory => loggerFactory;
public GrainReferenceActivator GrainReferenceActivator => grainReferenceActivator;
public ITimerRegistry TimerRegistry => timerRegistry;

public RuntimeMessagingTrace MessagingTrace => new(loggerFactory);
public InsideRuntimeClient RuntimeClient => runtimeClient;
public ActivationDirectory ActivationDirectory => activations;
public WorkItemGroup CreateWorkItemGroup(SystemTarget systemTarget)
{
ArgumentNullException.ThrowIfNull(systemTarget);
return new WorkItemGroup(
systemTarget,
_workItemGroupLogger,
_activationTaskSchedulerLogger,
schedulingOptions);
}
}
Loading