diff --git a/src/Orleans.Runtime/Hosting/ActivationRepartitioningExtensions.cs b/src/Orleans.Runtime/Hosting/ActivationRepartitioningExtensions.cs index 38e28b89548..607052c5b06 100644 --- a/src/Orleans.Runtime/Hosting/ActivationRepartitioningExtensions.cs +++ b/src/Orleans.Runtime/Hosting/ActivationRepartitioningExtensions.cs @@ -20,7 +20,7 @@ public static class ActivationRepartitioningExtensions /// [Experimental("ORLEANSEXP001")] public static ISiloBuilder AddActivationRepartitioner(this ISiloBuilder builder) - => builder.AddActivationRepartitioner(); + => builder.AddActivationRepartitioner(); /// /// Enables activation repartitioning for this silo. @@ -52,4 +52,4 @@ private static IServiceCollection AddActivationRepartitioner(this IServic return services; } -} \ No newline at end of file +} diff --git a/src/Orleans.Runtime/Placement/Repartitioning/DefaultImbalanceRule.cs b/src/Orleans.Runtime/Placement/Repartitioning/DefaultImbalanceRule.cs deleted file mode 100644 index 1ece66059f9..00000000000 --- a/src/Orleans.Runtime/Placement/Repartitioning/DefaultImbalanceRule.cs +++ /dev/null @@ -1,56 +0,0 @@ -using System; -using System.Linq; -using System.Collections.Concurrent; -using System.Threading.Tasks; -using System.Threading; -using Orleans.Placement.Repartitioning; - -namespace Orleans.Runtime.Placement.Repartitioning; - -/// -/// Tolerance rule which is aware of the cluster size. -/// -internal sealed class DefaultImbalanceRule(ISiloStatusOracle siloStatusOracle) : IImbalanceToleranceRule, - ILifecycleParticipant, ILifecycleObserver, ISiloStatusListener -{ - private const double Baseline = 10.1d; - private readonly object _lock = new(); - private readonly ConcurrentDictionary _silos = new(); - private readonly ISiloStatusOracle _siloStatusOracle = siloStatusOracle; - - private uint _allowedImbalance = 0; - - public bool IsSatisfiedBy(uint imbalance) => imbalance <= _allowedImbalance; - - public void SiloStatusChangeNotification(SiloAddress silo, SiloStatus status) - { - _ = _silos.AddOrUpdate(silo, static (_, arg) => arg, static (_, _, arg) => arg, status); - lock (_lock) - { - var activeSilos = _silos.Count(s => s.Value == SiloStatus.Active); - var percentageOfBaseline = 100d / (1 + Math.Exp(0.07d * activeSilos - 4.8d)); // inverted sigmoid - if (percentageOfBaseline < 10d) - { - percentageOfBaseline = 10d; - } - - // silos: 2 => tolerance: ~ 1000 - // silos: 100 => tolerance: ~ 100 - _allowedImbalance = (uint)Math.Round(Baseline * percentageOfBaseline, 0); - } - } - - public void Participate(ISiloLifecycle lifecycle) - => lifecycle.Subscribe(nameof(DefaultImbalanceRule), ServiceLifecycleStage.ApplicationServices, this); - - public Task OnStart(CancellationToken cancellationToken = default) - { - _siloStatusOracle.SubscribeToSiloStatusEvents(this); - return Task.CompletedTask; - } - public Task OnStop(CancellationToken cancellationToken = default) - { - _siloStatusOracle.UnSubscribeFromSiloStatusEvents(this); - return Task.CompletedTask; - } -} \ No newline at end of file diff --git a/src/Orleans.Runtime/Placement/Repartitioning/RebalancerCompatibleRule.cs b/src/Orleans.Runtime/Placement/Repartitioning/RebalancerCompatibleRule.cs new file mode 100644 index 00000000000..6cb2c2ff8fe --- /dev/null +++ b/src/Orleans.Runtime/Placement/Repartitioning/RebalancerCompatibleRule.cs @@ -0,0 +1,80 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using System.Threading; +using Orleans.Placement.Repartitioning; +using Orleans.Placement.Rebalancing; +using System.Collections.Generic; +using System.Runtime.InteropServices; +using Microsoft.Extensions.DependencyInjection; + +namespace Orleans.Runtime.Placement.Repartitioning; + +#nullable enable + +/// +/// Tolerance rule which is aware of the cluster size, and if rebalancer is enabled, it scales with the clusters imbalance. +/// +/// https://www.ledjonbehluli.com/posts/orleans_repartioner_rebalancer_coexistence/ +internal class RebalancerCompatibleRule(IServiceProvider provider) : + IImbalanceToleranceRule, ILifecycleParticipant, + ILifecycleObserver, ISiloStatusListener, IActivationRebalancerReportListener +{ + private readonly object _lock = new(); + private readonly Dictionary _silos = []; + + private uint _pairwiseImbalance; + private double _clusterImbalance; // If rebalancer is not registered this has no effect on computing the tolerance. + + private readonly ISiloStatusOracle _oracle = provider.GetRequiredService(); + private readonly IActivationRebalancer? _rebalancer = provider.GetService(); + + public bool IsSatisfiedBy(uint imbalance) => imbalance <= _pairwiseImbalance; + + public void SiloStatusChangeNotification(SiloAddress silo, SiloStatus status) + { + lock (_lock) + { + ref var statusRef = ref CollectionsMarshal.GetValueRefOrAddDefault(_silos, silo, out _); + statusRef = status; + + var activeSilos = _silos.Count(s => s.Value == SiloStatus.Active); + var percentageOfBaseline = 100d / (1 + Math.Exp(0.07d * activeSilos - 4.8d)); + + if (percentageOfBaseline < 10d) percentageOfBaseline = 10d; + + var pairwiseImbalance = (uint)Math.Round(10.1d * percentageOfBaseline, 0); + var toleranceFactor = Math.Cos(Math.PI * _clusterImbalance / 2); // This will always be 1 if rebalancer is not registered. + + _pairwiseImbalance = (uint)Math.Max(pairwiseImbalance * toleranceFactor, 0); + } + } + + public void Participate(ISiloLifecycle lifecycle) + => lifecycle.Subscribe(nameof(RebalancerCompatibleRule), + ServiceLifecycleStage.ApplicationServices, this); + + public void OnReport(RebalancingReport report) + { + lock (_lock) + { + _clusterImbalance = report.ClusterImbalance; + } + } + + public Task OnStart(CancellationToken cancellationToken) + { + _oracle.SubscribeToSiloStatusEvents(this); + _rebalancer?.SubscribeToReports(this); + + return Task.CompletedTask; + } + + public Task OnStop(CancellationToken cancellationToken) + { + _oracle.UnSubscribeFromSiloStatusEvents(this); + _rebalancer?.UnsubscribeFromReports(this); + + return Task.CompletedTask; + } +}