diff --git a/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs b/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs index dc9ffcb47ec..f3509a3df39 100644 --- a/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs +++ b/src/Orleans.Runtime/Configuration/Options/GrainDirectoryOptions.cs @@ -16,6 +16,7 @@ public enum CachingStrategyType /// Standard fixed-size LRU. LRU, /// Adaptive caching with fixed maximum size and refresh. This option should be used in production. + [Obsolete("Adaptive caching is deprecated in favor of LRU and will be removed in a future version. This value is now an alias for LRU.")] Adaptive, /// Custom cache implementation, configured by registering an implementation in the dependency injection container. Custom @@ -48,31 +49,37 @@ public enum CachingStrategyType /// /// Gets or sets the initial (minimum) time, in seconds, to keep a cache entry before revalidating. /// + [Obsolete("InitialCacheTTL is deprecated and will be removed in a future version.")] public TimeSpan InitialCacheTTL { get; set; } = DEFAULT_INITIAL_CACHE_TTL; /// /// The default value for . /// + [Obsolete("DEFAULT_INITIAL_CACHE_TTL is deprecated and will be removed in a future version.")] public static readonly TimeSpan DEFAULT_INITIAL_CACHE_TTL = TimeSpan.FromSeconds(30); /// /// Gets or sets the maximum time, in seconds, to keep a cache entry before revalidating. /// + [Obsolete("MaximumCacheTTL is deprecated and will be removed in a future version.")] public TimeSpan MaximumCacheTTL { get; set; } = DEFAULT_MAXIMUM_CACHE_TTL; /// /// The default value for . /// + [Obsolete("DEFAULT_MAXIMUM_CACHE_TTL is deprecated and will be removed in a future version.")] public static readonly TimeSpan DEFAULT_MAXIMUM_CACHE_TTL = TimeSpan.FromSeconds(240); /// /// Gets or sets the factor by which cache entry TTLs should be extended when they are found to be stable. /// + [Obsolete("CacheTTLExtensionFactor is deprecated and will be removed in a future version.")] public double CacheTTLExtensionFactor { get; set; } = DEFAULT_TTL_EXTENSION_FACTOR; /// /// The default value for . /// + [Obsolete("DEFAULT_TTL_EXTENSION_FACTOR is deprecated and will be removed in a future version.")] public const double DEFAULT_TTL_EXTENSION_FACTOR = 2.0; /// diff --git a/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs b/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs deleted file mode 100644 index 8e4c299abb7..00000000000 --- a/src/Orleans.Runtime/GrainDirectory/AdaptiveDirectoryCacheMaintainer.cs +++ /dev/null @@ -1,300 +0,0 @@ -#nullable enable -using System; -using System.Collections.Generic; -using System.Diagnostics; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.Logging; -using Orleans.GrainDirectory; -using Orleans.Runtime.Scheduler; - -namespace Orleans.Runtime.GrainDirectory -{ - internal sealed partial class AdaptiveDirectoryCacheMaintainer - { - private static readonly TimeSpan SLEEP_TIME_BETWEEN_REFRESHES = Debugger.IsAttached ? TimeSpan.FromMinutes(5) : TimeSpan.FromMinutes(1); // this should be something like minTTL/4 - - private readonly ILogger _log; - private readonly AdaptiveGrainDirectoryCache cache; - private readonly LocalGrainDirectory router; - private readonly IInternalGrainFactory grainFactory; - private readonly CancellationTokenSource _shutdownCts = new(); - - private long lastNumAccesses; // for stats - private long lastNumHits; // for stats - private Task? _runTask; - - internal AdaptiveDirectoryCacheMaintainer( - LocalGrainDirectory router, - AdaptiveGrainDirectoryCache cache, - IInternalGrainFactory grainFactory, - ILoggerFactory loggerFactory) - { - _log = loggerFactory.CreateLogger(); - this.grainFactory = grainFactory; - this.router = router; - this.cache = cache; - - lastNumAccesses = 0; - lastNumHits = 0; - } - - public void Start() - { - _runTask = Run(); - } - - public async Task StopAsync() - { - _shutdownCts.Cancel(); - if (_runTask is { } task) - { - await task; - } - } - - private async Task Run() - { - // Immediately yield back to the caller - await Task.CompletedTask.ConfigureAwait(ConfigureAwaitOptions.ForceYielding | ConfigureAwaitOptions.ContinueOnCapturedContext); - - var cancellationToken = _shutdownCts.Token; - while (!cancellationToken.IsCancellationRequested) - { - try - { - // recheck every X seconds (Consider making it a configurable parameter) - await Task.Delay(SLEEP_TIME_BETWEEN_REFRESHES, cancellationToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing | ConfigureAwaitOptions.ContinueOnCapturedContext); - if (cancellationToken.IsCancellationRequested) - { - break; - } - - // Run through all cache entries and do the following: - // 1. If the entry is not expired, skip it - // 2. If the entry is expired and was not accessed in the last time interval -- throw it away - // 3. If the entry is expired and was accessed in the last time interval, put into "fetch-batch-requests" list - - // At the end of the process, fetch batch requests for entries that need to be refreshed - - // Upon receiving refreshing answers, if the entry was not changed, double its expiration timer. - // If it was changed, update the cache and reset the expiration timer. - - // this dictionary holds a map between a silo address and the list of grains that need to be refreshed - var fetchInBatchList = new Dictionary>(); - - // get the list of cached grains - - // Stats for debugging. - int ownedAndRemovedCount = 0, keptCount = 0, removedCount = 0, refreshedCount = 0; - - // run through all cache entries - var enumerator = cache.GetStoredEntries(); - while (enumerator.MoveNext()) - { - var pair = enumerator.Current; - GrainId grain = pair.Key; - var entry = pair.Value; - - var owner = router.CalculateGrainDirectoryPartition(grain); - if (owner == null) // Null means there's no other silo and we're shutting down, so skip this entry - { - continue; - } - - if (entry == null) - { - // 0. If the entry was deleted in parallel, presumably due to cleanup after silo death - cache.Remove(grain); - removedCount++; // for debug - } - else if (!entry.IsExpired()) - { - // 1. If the entry is not expired, skip it - keptCount++; // for debug - } - else if (entry.NumAccesses == 0) - { - // 2. If the entry is expired and was not accessed in the last time interval -- throw it away - cache.Remove(grain); - removedCount++; // for debug - } - else - { - // 3. If the entry is expired and was accessed in the last time interval, put into "fetch-batch-requests" list - if (!fetchInBatchList.TryGetValue(owner, out var list)) - { - fetchInBatchList[owner] = list = new List(); - } - - list.Add(grain); - // And reset the entry's access count for next time - entry.NumAccesses = 0; - refreshedCount++; // for debug - } - } - - LogTraceSelfOwnedAndRemoved(_log, router.MyAddress, ownedAndRemovedCount, keptCount, removedCount, refreshedCount); - - // Send batch requests - SendBatchCacheRefreshRequests(fetchInBatchList); - - ProduceStats(); - } - catch (Exception ex) when (!cancellationToken.IsCancellationRequested) - { - LogErrorAdaptiveDirectoryCacheMaintainer(ex); - } - } - } - - private void SendBatchCacheRefreshRequests(Dictionary> refreshRequests) - { - foreach (var kv in refreshRequests) - { - var cachedGrainAndETagList = BuildGrainAndETagList(kv.Value); - - var silo = kv.Key; - - DirectoryInstruments.ValidationsCacheSent.Add(1); - // Send all of the items in one large request - var validator = this.grainFactory.GetSystemTarget(Constants.DirectoryCacheValidatorType, silo); - - router.CacheValidator.QueueTask(async () => - { - var response = await validator.LookUpMany(cachedGrainAndETagList); - ProcessCacheRefreshResponse(silo, response); - }).Ignore(); - - LogTraceSendingRequest(_log, router.MyAddress, silo, cachedGrainAndETagList.Count); - } - } - - private void ProcessCacheRefreshResponse( - SiloAddress silo, - List refreshResponse) - { - LogTraceReceivedProcessCacheRefreshResponse(_log, router.MyAddress, refreshResponse.Count); - - int otherSiloCount = 0, updatedCount = 0, unchangedCount = 0; - - // pass through returned results and update the cache if needed - foreach (var tuple in refreshResponse) - { - if (tuple.Address is { IsComplete: true }) - { - // the server returned an updated entry - cache.AddOrUpdate(tuple.Address, tuple.VersionTag); - otherSiloCount++; - } - else if (tuple.Address is { IsComplete: false }) - { - if (tuple.VersionTag == -1) - { - // The server indicates that it does not own the grain anymore. - // It could be that by now, the cache has been already updated and contains an entry received from another server (i.e., current owner for the grain). - // For simplicity, we do not care about this corner case and simply remove the cache entry. - cache.Remove(tuple.Address.GrainId); - updatedCount++; - } - else - { - // The server returned only a (not -1) generation number, indicating that we hold the most - // updated copy of the grain's activations list. - // Validate that the generation number in the request and the response are equal - // Contract.Assert(tuple.Item2 == refreshRequest.Find(o => o.Item1 == tuple.Item1).Item2); - // refresh the entry in the cache - cache.MarkAsFresh(tuple.Address.GrainId); - unchangedCount++; - } - } - } - - LogTraceProcessedRefreshResponse(_log, router.MyAddress, silo, otherSiloCount, updatedCount, unchangedCount); - } - - /// - /// Gets the list of grains (all owned by the same silo) and produces a new list - /// of tuples, where each tuple holds the grain and its generation counter currently stored in the cache - /// - /// List of grains owned by the same silo - /// List of grains in input along with their generation counters stored in the cache - private List<(GrainId, int)> BuildGrainAndETagList(List grains) - { - var grainAndETagList = new List<(GrainId, int)>(); - - foreach (GrainId grain in grains) - { - // NOTE: should this be done with TryGet? Won't Get invoke the LRU getter function? - AdaptiveGrainDirectoryCache.GrainDirectoryCacheEntry entry = cache.Get(grain); - - if (entry != null) - { - grainAndETagList.Add((grain, entry.ETag)); - } - else - { - // this may happen only if the LRU cache is full and decided to drop this grain - // while we try to refresh it - LogWarningGrainDisappearedFromCache(grain); - } - } - - return grainAndETagList; - } - - private void ProduceStats() - { - // We do not want to synchronize the access on numAccess and numHits in cache to avoid performance issues. - // Thus we take the current reading of these fields and calculate the stats. We might miss an access or two, - // but it should not be matter. - long curNumAccesses = cache.NumAccesses; - long curNumHits = cache.NumHits; - - long numAccesses = curNumAccesses - lastNumAccesses; - long numHits = curNumHits - lastNumHits; - - if (_log.IsEnabled(LogLevel.Trace)) _log.LogTrace("#accesses: {AccessCount}, hit-ratio: {HitRatio}%", numAccesses, (numHits / Math.Max(numAccesses, 0.00001)) * 100); - - lastNumAccesses = curNumAccesses; - lastNumHits = curNumHits; - } - - [LoggerMessage( - Level = LogLevel.Error, - Message = $"Error in {nameof(AdaptiveDirectoryCacheMaintainer)}." - )] - private partial void LogErrorAdaptiveDirectoryCacheMaintainer(Exception ex); - - [LoggerMessage( - EventId = (int)ErrorCode.Runtime_Error_100199, - Level = LogLevel.Warning, - Message = "Grain {GrainId} disappeared from the cache during maintenance" - )] - private partial void LogWarningGrainDisappearedFromCache(GrainId grainId); - - [LoggerMessage( - Level = LogLevel.Trace, - Message = "Silo {SiloAddress} self-owned (and removed) {OwnedAndRemovedCount}, kept {KeptCount}, removed {RemovedCount} and tried to refresh {RefreshedCount} grains" - )] - private static partial void LogTraceSelfOwnedAndRemoved(ILogger logger, SiloAddress siloAddress, int ownedAndRemovedCount, int keptCount, int removedCount, int refreshedCount); - - [LoggerMessage( - Level = LogLevel.Trace, - Message = "Silo {SiloAddress} is sending request to silo {OwnerSilo} with {Count} entries" - )] - private static partial void LogTraceSendingRequest(ILogger logger, SiloAddress siloAddress, SiloAddress ownerSilo, int count); - - [LoggerMessage( - Level = LogLevel.Trace, - Message = "Silo {SiloAddress} received ProcessCacheRefreshResponse. #Response entries {Count}." - )] - private static partial void LogTraceReceivedProcessCacheRefreshResponse(ILogger logger, SiloAddress siloAddress, int count); - - [LoggerMessage( - Level = LogLevel.Trace, - Message = "Silo {SiloAddress} processed refresh response from {OtherSilo} with {UpdatedCount} updated, {RemovedCount} removed, {UnchangedCount} unchanged grains" - )] - private static partial void LogTraceProcessedRefreshResponse(ILogger logger, SiloAddress siloAddress, SiloAddress otherSilo, int updatedCount, int removedCount, int unchangedCount); - } -} diff --git a/src/Orleans.Runtime/GrainDirectory/AdaptiveGrainDirectoryCache.cs b/src/Orleans.Runtime/GrainDirectory/AdaptiveGrainDirectoryCache.cs deleted file mode 100644 index 369640e368f..00000000000 --- a/src/Orleans.Runtime/GrainDirectory/AdaptiveGrainDirectoryCache.cs +++ /dev/null @@ -1,168 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using Orleans.Caching; -using Orleans.Internal; - -namespace Orleans.Runtime.GrainDirectory -{ - internal class AdaptiveGrainDirectoryCache : IGrainDirectoryCache - { - internal class GrainDirectoryCacheEntry - { - internal GrainAddress Address { get; } - private CoarseStopwatch LastRefreshed { get; set; } - internal TimeSpan ExpirationTimer { get; private set; } - internal int ETag { get; } - - /// - /// flag notifying whether this cache entry was accessed lately - /// (more precisely, since the last refresh) - /// - internal int NumAccesses { get; set; } - - internal GrainDirectoryCacheEntry(GrainAddress value, int etag, TimeSpan expirationTimer) - { - Address = value; - ETag = etag; - ExpirationTimer = expirationTimer; - LastRefreshed = CoarseStopwatch.StartNew(); - NumAccesses = 0; - } - - internal bool IsExpired() - { - return LastRefreshed.Elapsed >= ExpirationTimer; - } - - internal void Refresh(TimeSpan newExpirationTimer) - { - LastRefreshed = CoarseStopwatch.StartNew(); - ExpirationTimer = newExpirationTimer; - } - } - - private static readonly Func ActivationAddressesMatches = (entry, addr) => GrainAddress.MatchesGrainIdAndSilo(addr, entry.Address); - - private readonly ConcurrentLruCache cache; - /// controls the time the new entry is considered "fresh" (unit: ms) - private readonly TimeSpan initialExpirationTimer; - /// controls the exponential growth factor (i.e., x2, x4) for the freshness timer (unit: none) - private readonly double exponentialTimerGrowth; - // controls the boundary on the expiration timer - private readonly TimeSpan maxExpirationTimer; - - internal long NumAccesses; // number of cache item accesses (for stats) - internal long NumHits; // number of cache access hits (for stats) - - internal long LastNumAccesses; - internal long LastNumHits; - - public AdaptiveGrainDirectoryCache(TimeSpan initialExpirationTimer, TimeSpan maxExpirationTimer, double exponentialTimerGrowth, int maxCacheSize) - { - cache = new(maxCacheSize); - - this.initialExpirationTimer = initialExpirationTimer; - this.maxExpirationTimer = maxExpirationTimer; - this.exponentialTimerGrowth = exponentialTimerGrowth; - - DirectoryInstruments.RegisterCacheSizeObserve(() => cache.Count); - } - - public void AddOrUpdate(GrainAddress value, int version) - { - var entry = new GrainDirectoryCacheEntry(value, version, initialExpirationTimer); - - // Notice that LRU should know how to throw the oldest entry if the cache is full - cache.AddOrUpdate(value.GrainId, entry); - } - - public bool Remove(GrainId key) => cache.TryRemove(key); - - public bool Remove(GrainAddress key) => cache.TryRemove(key.GrainId, ActivationAddressesMatches, key); - - public void Clear() => cache.Clear(); - - public bool LookUp(GrainId key, out GrainAddress result, out int version) - { - NumAccesses++; // for stats - - // Here we do not check whether the found entry is expired. - // It will be done by the thread managing the cache. - // This is to avoid situation where the entry was just expired, but the manager still have not run and have not refreshed it. - if (!cache.TryGet(key, out var tmp)) - { - result = default; - version = default; - return false; - } - - NumHits++; // for stats - tmp.NumAccesses++; - result = tmp.Address; - version = tmp.ETag; - return true; - } - - public IEnumerable<(GrainAddress ActivationAddress, int Version)> KeyValues - { - get - { - foreach (var value in cache) - { - yield return (value.Value.Address, value.Value.ETag); - } - } - } - - public bool MarkAsFresh(GrainId key) - { - GrainDirectoryCacheEntry result; - if (!cache.TryGet(key, out result)) return false; - - TimeSpan newExpirationTimer = StandardExtensions.Min(maxExpirationTimer, result.ExpirationTimer.Multiply(exponentialTimerGrowth)); - result.Refresh(newExpirationTimer); - - return true; - } - - internal GrainDirectoryCacheEntry Get(GrainId key) - { - return cache.Get(key); - } - - internal IEnumerator> GetStoredEntries() - { - return cache.GetEnumerator(); - } - - public override string ToString() - { - var sb = new StringBuilder(); - - long curNumAccesses = NumAccesses - LastNumAccesses; - LastNumAccesses = NumAccesses; - long curNumHits = NumHits - LastNumHits; - LastNumHits = NumHits; - - sb.Append("Adaptive cache statistics:").AppendLine(); - sb.AppendFormat(" Cache size: {0} entries ({1} maximum)", cache.Count, cache.Capacity).AppendLine(); - sb.AppendFormat(" Since last call:").AppendLine(); - sb.AppendFormat(" Accesses: {0}", curNumAccesses); - sb.AppendFormat(" Hits: {0}", curNumHits); - if (curNumAccesses > 0) - { - sb.AppendFormat(" Hit Rate: {0:F1}%", (100.0 * curNumHits) / curNumAccesses).AppendLine(); - } - sb.AppendFormat(" Since start:").AppendLine(); - sb.AppendFormat(" Accesses: {0}", LastNumAccesses); - sb.AppendFormat(" Hits: {0}", LastNumHits); - if (LastNumAccesses > 0) - { - sb.AppendFormat(" Hit Rate: {0:F1}%", (100.0 * LastNumHits) / LastNumAccesses).AppendLine(); - } - - return sb.ToString(); - } - } -} diff --git a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryCacheFactory.cs b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryCacheFactory.cs index 6ec31feb2b6..c9a840ceb01 100644 --- a/src/Orleans.Runtime/GrainDirectory/GrainDirectoryCacheFactory.cs +++ b/src/Orleans.Runtime/GrainDirectory/GrainDirectoryCacheFactory.cs @@ -27,9 +27,10 @@ public static IGrainDirectoryCache CreateGrainDirectoryCache(IServiceProvider se case GrainDirectoryOptions.CachingStrategyType.None: return new NullGrainDirectoryCache(); case GrainDirectoryOptions.CachingStrategyType.LRU: - return new LruGrainDirectoryCache(options.CacheSize); +#pragma warning disable CS0618 // Type or member is obsolete case GrainDirectoryOptions.CachingStrategyType.Adaptive: - return new AdaptiveGrainDirectoryCache(options.InitialCacheTTL, options.MaximumCacheTTL, options.CacheTTLExtensionFactor, options.CacheSize); +#pragma warning restore CS0618 // Type or member is obsolete + return new LruGrainDirectoryCache(options.CacheSize); case GrainDirectoryOptions.CachingStrategyType.Custom: default: return services.GetRequiredService(); @@ -48,18 +49,6 @@ internal static IGrainDirectoryCache CreateCustomGrainDirectoryCache(IServicePro return new LruGrainDirectoryCache(options.CacheSize); } } - - internal static AdaptiveDirectoryCacheMaintainer CreateGrainDirectoryCacheMaintainer( - LocalGrainDirectory router, - IGrainDirectoryCache cache, - IInternalGrainFactory grainFactory, - ILoggerFactory loggerFactory) - { - var adaptiveCache = cache as AdaptiveGrainDirectoryCache; - return adaptiveCache != null - ? new AdaptiveDirectoryCacheMaintainer(router, adaptiveCache, grainFactory, loggerFactory) - : null; - } } internal class NullGrainDirectoryCache : IGrainDirectoryCache diff --git a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs index b795f5dea01..e16107066c5 100644 --- a/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs +++ b/src/Orleans.Runtime/GrainDirectory/LocalGrainDirectory.cs @@ -16,7 +16,6 @@ namespace Orleans.Runtime.GrainDirectory { internal sealed partial class LocalGrainDirectory : ILocalGrainDirectory, ISiloStatusListener, ILifecycleParticipant { - private readonly AdaptiveDirectoryCacheMaintainer maintainer; private readonly ILogger log; private readonly SiloAddress? seed; private readonly ISiloStatusOracle siloStatusOracle; @@ -60,12 +59,6 @@ public LocalGrainDirectory( this.grainFactory = grainFactory; DirectoryCache = GrainDirectoryCacheFactory.CreateGrainDirectoryCache(serviceProvider, grainDirectoryOptions.Value); - maintainer = - GrainDirectoryCacheFactory.CreateGrainDirectoryCacheMaintainer( - this, - this.DirectoryCache, - grainFactory, - loggerFactory); var primarySiloEndPoint = developmentClusterMembershipOptions.Value.PrimarySiloEndpoint; if (primarySiloEndPoint != null) @@ -99,10 +92,6 @@ public void Start() LogDebugStart(); Running = true; - if (maintainer != null) - { - CacheValidator.WorkItemGroup.QueueAction(maintainer.Start); - } siloStatusOracle.SubscribeToSiloStatusEvents(this); } @@ -113,7 +102,7 @@ public void Start() // The alternative would be to allow the silo to process requests after it has handed off its partition, in which case those changes // would receive successful responses but would not be reflected in the eventual state of the directory. // It's easy to change this, if we think the trade-off is better the other way. - public async Task StopAsync() + public Task StopAsync() { // This will cause remote write requests to be forwarded to the silo that will become the new owner. // Requests might bounce back and forth for a while as membership stabilizes, but they will either be served by the @@ -123,13 +112,9 @@ public async Task StopAsync() //mark Running as false will exclude myself from CalculateGrainDirectoryPartition(grainId) Running = false; - if (maintainer is { } directoryCacheMaintainer) - { - await CacheValidator.QueueTask(directoryCacheMaintainer.StopAsync); - } - DirectoryPartition.Clear(); DirectoryCache.Clear(); + return Task.CompletedTask; } private void AddServer(SiloAddress silo) diff --git a/test/TesterInternal/General/ElasticPlacementTest.cs b/test/TesterInternal/General/ElasticPlacementTest.cs index 57f06ddb091..1a6405d6d80 100644 --- a/test/TesterInternal/General/ElasticPlacementTest.cs +++ b/test/TesterInternal/General/ElasticPlacementTest.cs @@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging; using Orleans.Configuration; using Orleans.Runtime; +using Orleans.Runtime.Placement; using Orleans.TestingHost; using TestExtensions; using UnitTests.GrainInterfaces; @@ -198,6 +199,7 @@ private async Task GetGrainAtSilo(SiloAddress silo) { while (true) { + RequestContext.Set(IPlacementDirector.PlacementHintKey, silo); IPlacementTestGrain grain = this.GrainFactory.GetGrain(Guid.NewGuid()); SiloAddress address = await grain.GetLocation(); if (address.Equals(silo))