Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ namespace Orleans.Placement;
[AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
public abstract class PlacementFilterAttribute : Attribute, IGrainPropertiesProviderAttribute
{
/// <summary>
/// Gets the placement filter strategy.
/// </summary>
public PlacementFilterStrategy PlacementFilterStrategy { get; private set; }

protected PlacementFilterAttribute(PlacementFilterStrategy placement)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using Orleans.Metadata;
using Orleans.Runtime;

#nullable enable
namespace Orleans.Placement;

/// <summary>
/// Represents a strategy for filtering silos which a grain can be placed on.
/// </summary>
public abstract class PlacementFilterStrategy
{
public int Order { get; private set; }
Expand Down Expand Up @@ -36,7 +40,6 @@ public void Initialize(GrainProperties properties)

public virtual void AdditionalInitialize(GrainProperties properties)
{

}

/// <summary>
Expand All @@ -58,7 +61,7 @@ public void PopulateGrainProperties(IServiceProvider services, Type grainClass,
properties[WellKnownGrainTypeProperties.PlacementFilter] = typeName;
}

properties[$"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.order"] = Order.ToString();
properties[$"{WellKnownGrainTypeProperties.PlacementFilter}.{typeName}.order"] = Order.ToString(CultureInfo.InvariantCulture);

foreach (var additionalGrainProperty in GetAdditionalGrainProperties(services, grainClass, grainType, properties))
{
Expand Down
8 changes: 5 additions & 3 deletions src/Orleans.Core/Runtime/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal static class Constants
public static readonly GrainType DirectoryCacheValidatorType = SystemTargetGrainId.CreateGrainType("dir.cache-validator");
public static readonly GrainType ClientDirectoryType = SystemTargetGrainId.CreateGrainType("dir.client");
public static readonly GrainType SiloControlType = SystemTargetGrainId.CreateGrainType("silo-control");
public static readonly GrainType SiloMetadataType = SystemTargetGrainId.CreateGrainType("silo-metadata");
public static readonly GrainType CatalogType = SystemTargetGrainId.CreateGrainType("catalog");
public static readonly GrainType MembershipServiceType = SystemTargetGrainId.CreateGrainType("clustering");
public static readonly GrainType SystemMembershipTableType = SystemTargetGrainId.CreateGrainType("clustering.dev");
Expand All @@ -27,8 +28,8 @@ internal static class Constants
public static readonly GrainType ActivationMigratorType = SystemTargetGrainId.CreateGrainType("migrator");
public static readonly GrainType ActivationRepartitionerType = SystemTargetGrainId.CreateGrainType("repartitioner");
public static readonly GrainType ActivationRebalancerMonitorType = SystemTargetGrainId.CreateGrainType("rebalancer-monitor");
public static readonly GrainType GrainDirectoryPartition = SystemTargetGrainId.CreateGrainType("dir.grain.part");
public static readonly GrainType GrainDirectory = SystemTargetGrainId.CreateGrainType("dir.grain");
public static readonly GrainType GrainDirectoryPartitionType = SystemTargetGrainId.CreateGrainType("dir.grain.part");
public static readonly GrainType GrainDirectoryType = SystemTargetGrainId.CreateGrainType("dir.grain");

public static readonly GrainId SiloDirectConnectionId = GrainId.Create(
GrainType.Create(GrainTypePrefix.SystemPrefix + "silo"),
Expand All @@ -41,6 +42,7 @@ internal static class Constants
{DirectoryServiceType, "DirectoryService"},
{DirectoryCacheValidatorType, "DirectoryCacheValidator"},
{SiloControlType, "SiloControl"},
{SiloMetadataType, "SiloMetadata"},
{ClientDirectoryType, "ClientDirectory"},
{CatalogType,"Catalog"},
{MembershipServiceType,"MembershipService"},
Expand All @@ -57,7 +59,7 @@ internal static class Constants
{ActivationMigratorType, "ActivationMigrator"},
{ActivationRepartitionerType, "ActivationRepartitioner"},
{ActivationRebalancerMonitorType, "ActivationRebalancerMonitor"},
{GrainDirectory, "GrainDirectory"},
{GrainDirectoryType, "GrainDirectory"},
}.ToFrozenDictionary();

public static string SystemTargetName(GrainType id) => SingletonSystemTargetNames.TryGetValue(id, out var name) ? name : id.ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public DistributedGrainDirectory(
ILocalSiloDetails localSiloDetails,
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider,
IInternalGrainFactory grainFactory) : base(Constants.GrainDirectory, localSiloDetails.SiloAddress, loggerFactory)
IInternalGrainFactory grainFactory) : base(Constants.GrainDirectoryType, localSiloDetails.SiloAddress, loggerFactory)
{
_serviceProvider = serviceProvider;
_membershipService = membershipService;
Expand Down
4 changes: 2 additions & 2 deletions src/Orleans.Runtime/GrainDirectory/GrainDirectoryPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ internal sealed partial class GrainDirectoryPartition(
IInternalGrainFactory grainFactory)
: SystemTarget(CreateGrainId(localSiloDetails.SiloAddress, partitionIndex), localSiloDetails.SiloAddress, loggerFactory), IGrainDirectoryPartition, IGrainDirectoryTestHooks
{
internal static SystemTargetGrainId CreateGrainId(SiloAddress siloAddress, int partitionIndex) => SystemTargetGrainId.Create(Constants.GrainDirectoryPartition, siloAddress, partitionIndex.ToString(CultureInfo.InvariantCulture));
internal static SystemTargetGrainId CreateGrainId(SiloAddress siloAddress, int partitionIndex) => SystemTargetGrainId.Create(Constants.GrainDirectoryPartitionType, siloAddress, partitionIndex.ToString(CultureInfo.InvariantCulture));
private readonly Dictionary<GrainId, GrainAddress> _directory = [];
private readonly int _partitionIndex = partitionIndex;
private readonly DistributedGrainDirectory _owner = owner;
Expand Down Expand Up @@ -665,7 +665,7 @@ private async IAsyncEnumerable<List<GrainAddress>> GetRegisteredActivations(Dire
async Task<List<GrainAddress>> GetRegisteredActivationsFromClusterMember(MembershipVersion version, RingRange range, SiloAddress siloAddress, bool isValidation)
{
var stopwatch = ValueStopwatch.StartNew();
var client = _grainFactory.GetSystemTarget<IGrainDirectoryClient>(Constants.GrainDirectory, siloAddress);
var client = _grainFactory.GetSystemTarget<IGrainDirectoryClient>(Constants.GrainDirectoryType, siloAddress);
var result = await InvokeOnClusterMember(
siloAddress,
async () =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#nullable enable

namespace Orleans.Runtime.MembershipService.SiloMetadata;

public interface ISiloMetadataCache
{
SiloMetadata GetMetadata(SiloAddress siloAddress);
SiloMetadata GetSiloMetadata(SiloAddress siloAddress);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
using System.Threading.Tasks;
using Orleans.Services;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

public interface ISiloMetadataClient : IGrainServiceClient<ISiloMetadataGrainService>
internal interface ISiloMetadataClient
{
Task<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress);
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using System.Threading.Tasks;
using Orleans.Services;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

[Alias("Orleans.Runtime.MembershipService.SiloMetadata.ISiloMetadataGrainService")]
public interface ISiloMetadataGrainService : IGrainService
[Alias("Orleans.Runtime.MembershipService.SiloMetadata.ISiloMetadataSystemTarget")]
internal interface ISiloMetadataSystemTarget : ISystemTarget
{
[Alias("GetSiloMetadata")]
Task<SiloMetadata> GetSiloMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Orleans.Configuration;
using Orleans.Internal;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;
Expand All @@ -22,44 +20,44 @@ internal class SiloMetadataCache(

void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
var tasks = new List<Task>(1);
var cancellation = new CancellationTokenSource();
Task OnRuntimeInitializeStart(CancellationToken _)
Task? task = null;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This previous code was copied from DistributedGrainDirectory. Should there be a follow up issue to update usages of the prior pattern with this one?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should clean up the other instances.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #9360

Task OnStart(CancellationToken _)
{
tasks.Add(Task.Run(() => this.ProcessMembershipUpdates(cancellation.Token)));
task = Task.Run(() => this.ProcessMembershipUpdates(_cts.Token));
return Task.CompletedTask;
}

async Task OnRuntimeInitializeStop(CancellationToken ct)
async Task OnStop(CancellationToken ct)
{
cancellation.Cancel(throwOnFirstException: false);
var shutdownGracePeriod = Task.WhenAll(Task.Delay(ClusterMembershipOptions.ClusteringShutdownGracePeriod), ct.WhenCancelled());
await Task.WhenAny(shutdownGracePeriod, Task.WhenAll(tasks));
await _cts.CancelAsync().ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
if (task is not null)
{
await task.WaitAsync(ct).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
}

lifecycle.Subscribe(
nameof(ClusterMembershipService),
ServiceLifecycleStage.RuntimeInitialize,
OnRuntimeInitializeStart,
OnRuntimeInitializeStop);
ServiceLifecycleStage.RuntimeServices,
OnStart,
OnStop);
}


private async Task ProcessMembershipUpdates(CancellationToken ct)
{
try
{
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Starting to process membership updates");
if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug("Starting to process membership updates.");
await foreach (var update in membershipTableManager.MembershipTableUpdates.WithCancellation(ct))
{
// Add entries for members that aren't already in the cache
foreach (var membershipEntry in update.Entries.Where(e => e.Value.Status != SiloStatus.Dead))
foreach (var membershipEntry in update.Entries.Where(e => e.Value.Status is SiloStatus.Active or SiloStatus.Joining))
{
if (!_metadata.ContainsKey(membershipEntry.Key))
{
try
{
var metadata = await siloMetadataClient.GetSiloMetadata(membershipEntry.Key);
var metadata = await siloMetadataClient.GetSiloMetadata(membershipEntry.Key).WaitAsync(ct);
_metadata.TryAdd(membershipEntry.Key, metadata);
}
catch(Exception exception)
Expand All @@ -85,6 +83,10 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
}
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
// Ignore and continue shutting down.
}
catch (Exception exception)
{
logger.LogError(exception, "Error processing membership updates");
Expand All @@ -95,7 +97,7 @@ private async Task ProcessMembershipUpdates(CancellationToken ct)
}
}

public SiloMetadata GetMetadata(SiloAddress siloAddress) => _metadata.GetValueOrDefault(siloAddress) ?? SiloMetadata.Empty;
public SiloMetadata GetSiloMetadata(SiloAddress siloAddress) => _metadata.GetValueOrDefault(siloAddress) ?? SiloMetadata.Empty;

public void SetMetadata(SiloAddress siloAddress, SiloMetadata metadata) => _metadata.TryAdd(siloAddress, metadata);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
using System;
using System.Threading.Tasks;
using Orleans.Runtime.Services;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

public class SiloMetadataClient(IServiceProvider serviceProvider)
: GrainServiceClient<ISiloMetadataGrainService>(serviceProvider), ISiloMetadataClient
internal sealed class SiloMetadataClient(IInternalGrainFactory grainFactory) : ISiloMetadataClient
{
public async Task<SiloMetadata> GetSiloMetadata(SiloAddress siloAddress)
{
var grainService = GetGrainService(siloAddress);
var metadata = await grainService.GetSiloMetadata();
var metadataSystemTarget = grainFactory.GetSystemTarget<ISiloMetadataSystemTarget>(Constants.SiloMetadataType, siloAddress);
var metadata = await metadataSystemTarget.GetSiloMetadata();
return metadata;
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

#nullable enable
namespace Orleans.Runtime.MembershipService.SiloMetadata;

public class SiloMetadataGrainService : GrainService, ISiloMetadataGrainService
internal sealed class SiloMetadataSystemTarget(
IOptions<SiloMetadata> siloMetadata,
ILocalSiloDetails localSiloDetails,
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider)
: SystemTarget(Constants.SiloMetadataType, localSiloDetails.SiloAddress, loggerFactory), ISiloMetadataSystemTarget, ILifecycleParticipant<ISiloLifecycle>
{
private readonly SiloMetadata _siloMetadata;
private readonly SiloMetadata _siloMetadata = siloMetadata.Value;

public SiloMetadataGrainService(IOptions<SiloMetadata> siloMetadata) : base()
{
_siloMetadata = siloMetadata.Value;
}
public Task<SiloMetadata> GetSiloMetadata() => Task.FromResult(_siloMetadata);

public SiloMetadataGrainService(IOptions<SiloMetadata> siloMetadata, GrainId grainId, Silo silo, ILoggerFactory loggerFactory) : base(grainId, silo, loggerFactory)
void ILifecycleParticipant<ISiloLifecycle>.Participate(ISiloLifecycle lifecycle)
{
_siloMetadata = siloMetadata.Value;
}
lifecycle.Subscribe(nameof(SiloMetadataSystemTarget), ServiceLifecycleStage.RuntimeInitialize, OnRuntimeInitializeStart, OnRuntimeInitializeStop);

public Task<SiloMetadata> GetSiloMetadata() => Task.FromResult(_siloMetadata);
Task OnRuntimeInitializeStart(CancellationToken token)
{
serviceProvider.GetRequiredService<Catalog>().RegisterSystemTarget(this);
return Task.CompletedTask;
}

Task OnRuntimeInitializeStop(CancellationToken token) => Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Orleans.Runtime.MembershipService.SiloMetadata;

public static class SiloMetadataHostingExtensions
{

/// <summary>
/// Configure silo metadata from the builder configuration.
/// </summary>
Expand Down Expand Up @@ -58,7 +57,7 @@ public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, IConfigura
{
var dictionary = configurationSection.Get<Dictionary<string, string>>();

return builder.UseSiloMetadata(dictionary ?? new Dictionary<string, string>());
return builder.UseSiloMetadata(dictionary ?? []);
}

/// <summary>
Expand All @@ -73,16 +72,15 @@ public static ISiloBuilder UseSiloMetadata(this ISiloBuilder builder, Dictionary
{
services
.AddOptionsWithValidateOnStart<SiloMetadata>()
.Configure(m =>
{
m.AddMetadata(metadata);
});
.Configure(m => m.AddMetadata(metadata));

services.AddGrainService<SiloMetadataGrainService>();
services.AddSingleton<SiloMetadataSystemTarget>();
services.AddFromExisting<ILifecycleParticipant<ISiloLifecycle>, SiloMetadataSystemTarget>();
services.AddSingleton<SiloMetadataCache>();
services.AddFromExisting<ISiloMetadataCache, SiloMetadataCache>();
services.AddFromExisting<ILifecycleParticipant<ISiloLifecycle>, SiloMetadataCache>();
services.AddSingleton<ISiloMetadataClient, SiloMetadataClient>();

// Placement filters
services.AddPlacementFilter<PreferredMatchSiloMetadataPlacementFilterStrategy, PreferredMatchSiloMetadataPlacementFilterDirector>(ServiceLifetime.Transient);
services.AddPlacementFilter<RequiredMatchSiloMetadataPlacementFilterStrategy, RequiredMatchSiloMetadataPlacementFilterDirector>(ServiceLifetime.Transient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ internal class PreferredMatchSiloMetadataPlacementFilterDirector(
{
public IEnumerable<SiloAddress> Filter(PlacementFilterStrategy filterStrategy, PlacementTarget target, IEnumerable<SiloAddress> silos)
{
var preferredMatchSiloMetadataPlacementFilterStrategy = (filterStrategy as PreferredMatchSiloMetadataPlacementFilterStrategy);
var preferredMatchSiloMetadataPlacementFilterStrategy = filterStrategy as PreferredMatchSiloMetadataPlacementFilterStrategy;
var minCandidates = preferredMatchSiloMetadataPlacementFilterStrategy?.MinCandidates ?? 1;
var orderedMetadataKeys = preferredMatchSiloMetadataPlacementFilterStrategy?.OrderedMetadataKeys ?? [];

var localSiloMetadata = siloMetadataCache.GetMetadata(localSiloDetails.SiloAddress).Metadata;
var localSiloMetadata = siloMetadataCache.GetSiloMetadata(localSiloDetails.SiloAddress).Metadata;

if (localSiloMetadata.Count == 0)
{
Expand All @@ -39,7 +39,7 @@ public IEnumerable<SiloAddress> Filter(PlacementFilterStrategy filterStrategy, P
var scoreCounts = new int[orderedMetadataKeys.Length+1];
for (var i = 0; i < siloList.Count; i++)
{
var siloMetadata = siloMetadataCache.GetMetadata(siloList[i]).Metadata;
var siloMetadata = siloMetadataCache.GetSiloMetadata(siloList[i]).Metadata;
var siloScore = 0;
for (var j = orderedMetadataKeys.Length - 1; j >= 0; --j)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using Orleans.Metadata;
using Orleans.Placement;

Expand All @@ -23,6 +24,7 @@ public override void AdditionalInitialize(GrainProperties properties)
{
throw new ArgumentException("Invalid ordered-metadata-keys property value.");
}

OrderedMetadataKeys = placementFilterGrainProperty.Split(",");
var minCandidatesProperty = GetPlacementFilterGrainProperty("min-candidates", properties);
if (!int.TryParse(minCandidatesProperty, out var parsedMinCandidates))
Expand All @@ -37,6 +39,6 @@ protected override IEnumerable<KeyValuePair<string, string>> GetAdditionalGrainP
IReadOnlyDictionary<string, string> existingProperties)
{
yield return new KeyValuePair<string, string>("ordered-metadata-keys", string.Join(",", OrderedMetadataKeys));
yield return new KeyValuePair<string, string>("min-candidates", MinCandidates.ToString());
yield return new KeyValuePair<string, string>("min-candidates", MinCandidates.ToString(CultureInfo.InvariantCulture));
}
}
Loading
Loading