From 11ce9ea076a3d008831fe0b238670cf08cae2253 Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Fri, 29 Jul 2022 10:36:49 -0700 Subject: [PATCH 1/8] [WebJobs] Target base scale --- build/common.props | 2 +- .../WebJobsServiceCollectionExtensions.cs | 2 + .../Scale/DynamicTargetValueProvider.cs | 246 ++++++++++++++++++ .../Scale/IDynamicTargetValueProvider.cs | 13 + .../Scale/ITargetScaleMonitor.cs | 18 ++ 5 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs create mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs create mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs diff --git a/build/common.props b/build/common.props index 66c94d835..9967ba4a6 100644 --- a/build/common.props +++ b/build/common.props @@ -1,7 +1,7 @@ - 3.0.34$(VersionSuffix) + 3.0.34-target-scale$(VersionSuffix) 5.0.0-beta.2$(VersionSuffix) 4.0.3$(VersionSuffix) diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs index 48eb866aa..e6087a09d 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs @@ -88,6 +88,8 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); + services.AddSingleton(); services.AddSingleton(); diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs new file mode 100644 index 000000000..5f0269729 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs @@ -0,0 +1,246 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + internal class DynamicTargetValueProvider : IDynamicTargetValueProvider + { + private const int DynamicConcurrencyStabilizationTimeInSeconds = 60; //default value in the scale controller + private const int CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds = 60; //default value in the scale controller + private const int CacheExpirationGracePeriodInSeconds = 30; //default value in the scale controller + private const int DynamicConcurrencyStabilizationRange = 1; //default value in the scale controller + + private IConcurrencyStatusRepository _concurrencyStatusRepository; + private DateTime lastFunctionSnapshotCacheUpdateTime; + private ConcurrentDictionary dynamicConcurrencyFunctionCache; + private SemaphoreSlim dynamicConcurrencyCacheUpdateLock; + + public DynamicTargetValueProvider(IConcurrencyStatusRepository concurrencyStatusRepository) + { + _concurrencyStatusRepository = concurrencyStatusRepository; + lastFunctionSnapshotCacheUpdateTime = DateTime.MinValue; + dynamicConcurrencyFunctionCache = new ConcurrentDictionary(); + this.dynamicConcurrencyCacheUpdateLock = new SemaphoreSlim(1); + } + + public async Task GetDynamicTargetValue(string functionId, bool isDynamicConcurrencyEnabled) + { + int fallbackValue = -1; + if (isDynamicConcurrencyEnabled) + { + DateTime currentTime = DateTime.UtcNow; + FunctionSnapshotCacheEntry functionSnapshotCacheEntry; + if (IsCachedMetricValueValid(functionId, currentTime, out functionSnapshotCacheEntry)) + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Using the cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} for function {functionName}."); + return functionSnapshotCacheEntry.MetricValue; + } + + if (IsCachedMetricValidFallback(functionSnapshotCacheEntry, currentTime)) + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} is valid as a fallback for function {functionName}."); + fallbackValue = functionSnapshotCacheEntry.MetricValue; + } + + if (_concurrencyStatusRepository == null) + { + if (dynamicConcurrencyCacheUpdateLock.Wait(0)) + { + try + { + if ((currentTime - lastFunctionSnapshotCacheUpdateTime).TotalSeconds < DynamicConcurrencyStabilizationTimeInSeconds) + { + //Only want each site to read from blob storage once every DynamicConcurrencyStabilizationTimeInSeconds + //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Site {site.Name} has already read from blob storage for dynamic concurrency value within the past {config.DynamicConcurrencyStabilizationTimeInSeconds} seconds. Last access to blob storage was at {lastFunctionSnapshotCacheUpdateTime}."); + return fallbackValue; + } + + //DynamicConcurrencyStatusBlob dynamicConcurrencyDataJson = await ParseJSONFromDynamicConcurrencyBlobAsync(); + HostConcurrencySnapshot hostConcurrencySnapshot = await _concurrencyStatusRepository.ReadAsync(CancellationToken.None); + + if (hostConcurrencySnapshot == null) + { + //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Unable to deserialize dynamic concurrency status blob."); + return fallbackValue; + } + + UpdateCacheForAllFunctions(hostConcurrencySnapshot, currentTime); + lastFunctionSnapshotCacheUpdateTime = currentTime; + + FunctionSnapshotCacheEntry newCacheEntry; + if (GetCachedFunctionSnapshotForFunction(functionId, out newCacheEntry) && newCacheEntry.MetricIsStable) + { + //ExtensionLogEventSource.Log.SiteInformation(context.AppName, $"Using stabilized value of {newCacheEntry.MetricValue} for function {functionName}"); + return newCacheEntry.MetricValue; + } + else + { + return fallbackValue; + } + } + catch // (Exception e) + { + //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Unable to read from the blob where dynamic concurrency data is stored for site {site.Name} due to exception {e}."); + } + finally + { + dynamicConcurrencyCacheUpdateLock.Release(); + } + } + } + else + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Target Based Scaling Algorithm was unable to connect to the blob where dynamic concurrency data is stored for site {site.Name} upon instantiation."); + } + } + return fallbackValue; + + + // Returning fallback value since Dynamic Concurrency is not enabled + // fallbackValue can either be user-defined, or the default value for service bus and eventhub + //bool isDefault = fallbackValue == DefaultTargetEventHubMetric || fallbackValue == DefaultTargetServiceBusMetric; + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Using {(isDefault ? "default" : "user-defined")} desiredMetricValue of {fallbackValue} for {triggerName} trigger"); + //return context.StaticTargetValue; + + //int desiredWorkerCount = (int)Math.Ceiling((decimal)targetContext.Metrics.Last().MessageCount / desiredMetricValue); + + //ExtensionLogEventSource.Log.SiteScaleVote(site.Name, queueTriggerId, scaleResult.ToString(), scaleReason); + //return desiredWorkerCount - context.WorkerCount; + } + + internal bool IsCachedMetricValidFallback(FunctionSnapshotCacheEntry cacheEntry, DateTime currentTime) + { + if (cacheEntry == null) + { + return false; + } + + if (!cacheEntry.MetricIsStable) + { + return false; + } + + double secondsExpired = (currentTime - cacheEntry.MetricCachedAt).TotalSeconds - CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds; + + if (secondsExpired > CacheExpirationGracePeriodInSeconds) + { + return false; + } + + return true; + } + + internal bool IsCachedMetricValueValid(string functionName, DateTime currentTime, out FunctionSnapshotCacheEntry functionSnapshotCacheEntry) + { + if (dynamicConcurrencyFunctionCache.TryGetValue(functionName, out functionSnapshotCacheEntry)) + { + if (!functionSnapshotCacheEntry.MetricIsStable) + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} is not stable for function {functionName}. " + + //$"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); + return false; + } + + if ((currentTime - functionSnapshotCacheEntry.MetricCachedAt).TotalSeconds > CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds) + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue {functionSnapshotCacheEntry.MetricValue} has expired for function {functionName}. " + + //$"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); + return false; + } + return true; + } + else + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"No cached desiredMetricValue exists for function {functionName}. " + + // $"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); + return false; + } + } + + internal void UpdateSnapshotCacheForFunction(string functionName, int desiredMetricValue, DateTime currentTime, bool isStable) + { + FunctionSnapshotCacheEntry newFunctionSnapshot = new FunctionSnapshotCacheEntry(desiredMetricValue, currentTime, isStable); + + // In the time that this thread was running, another thread could have updated the dictionary with a more recently pulled desiredMetricValue. In that case, we don't want to + // populate the dictionary with our value, and in either case we just return whatever value is currently in the direction (post-population or not) + dynamicConcurrencyFunctionCache.AddOrUpdate(functionName, newFunctionSnapshot, + (key, currentFunctionSnapshot) => + currentFunctionSnapshot.MetricCachedAt < currentTime ? newFunctionSnapshot : currentFunctionSnapshot); + } + + internal void UpdateCacheForAllFunctions(HostConcurrencySnapshot dcBlob, DateTime currentTime) + { + if (dcBlob.FunctionSnapshots != null) + { + foreach (var fullFunctionName in dcBlob.FunctionSnapshots.Keys) + { + if (dcBlob.FunctionSnapshots.TryGetValue(fullFunctionName, out FunctionConcurrencySnapshot functionSnapshot)) + { + int newDesiredMetricValue = functionSnapshot.Concurrency; + string functionName = GetFunctionName(fullFunctionName); + bool isStable = IsNewMetricValueStableForFunction(functionName, newDesiredMetricValue); + UpdateSnapshotCacheForFunction(functionName, newDesiredMetricValue, currentTime, isStable); + } + } + } + } + + internal bool GetCachedFunctionSnapshotForFunction(string functionName, out FunctionSnapshotCacheEntry functionSnapshotCacheEntry) + { + return dynamicConcurrencyFunctionCache.TryGetValue(functionName, out functionSnapshotCacheEntry); + } + + internal bool IsNewMetricValueStableForFunction(string functionName, int newMetricValue) + { + if (newMetricValue < 0) + { + return false; + } + + if (!GetCachedFunctionSnapshotForFunction(functionName, out FunctionSnapshotCacheEntry oldEntry)) + { + return false; + } + + int oldMetricValue = oldEntry.MetricValue; + if (newMetricValue <= oldMetricValue + DynamicConcurrencyStabilizationRange && + newMetricValue >= oldMetricValue - DynamicConcurrencyStabilizationRange) + { + return true; + } + return false; + } + + internal static string GetFunctionName(string fullFunctionName) + { + if (string.IsNullOrEmpty(fullFunctionName)) + { + return ""; + } + return fullFunctionName.Split('.').Last(); + } + } +} + +internal class FunctionSnapshotCacheEntry +{ + internal int MetricValue { get; set; } + + internal DateTime MetricCachedAt { get; set; } + + internal bool MetricIsStable { get; set; } + + + public FunctionSnapshotCacheEntry(int metricValue, DateTime metricCachedAt, bool metricIsStable) + { + MetricValue = metricValue; + MetricCachedAt = metricCachedAt; + MetricIsStable = metricIsStable; + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs new file mode 100644 index 000000000..665d11b60 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.Host.Executors; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + public interface IDynamicTargetValueProvider + { + Task GetDynamicTargetValue(string functionId, bool isDynamicConcurrencyEnabled); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs new file mode 100644 index 000000000..f56ab2b0e --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs @@ -0,0 +1,18 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + public interface ITargetScaleMonitor : IScaleMonitor + { + Task GetScaleVoteAsync(ScaleStatusContext context); + } + + public interface ITargetScaleMonitor : IScaleMonitor where TMetrics : ScaleMetrics + { + new Task GetMetricsAsync(); + + Task GetScaleVoteAsync(ScaleStatusContext context); + } +} From 614d5be891bc337c3226b40a761cb16a3263c349 Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Fri, 29 Jul 2022 14:38:21 -0700 Subject: [PATCH 2/8] Fix ITargetScaleMonitor --- src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs index f56ab2b0e..ac2711069 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs @@ -9,7 +9,7 @@ public interface ITargetScaleMonitor : IScaleMonitor Task GetScaleVoteAsync(ScaleStatusContext context); } - public interface ITargetScaleMonitor : IScaleMonitor where TMetrics : ScaleMetrics + public interface ITargetScaleMonitor : ITargetScaleMonitor where TMetrics : ScaleMetrics { new Task GetMetricsAsync(); From 07970dabd1f417db6d0f6e0b1354c2fe477a8050 Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Fri, 29 Jul 2022 10:36:49 -0700 Subject: [PATCH 3/8] [WebJobs] Target base scale --- build/common.props | 2 +- .../WebJobsServiceCollectionExtensions.cs | 2 + .../Scale/DynamicTargetValueProvider.cs | 246 ++++++++++++++++++ .../Scale/IDynamicTargetValueProvider.cs | 13 + .../Scale/ITargetScaleMonitor.cs | 18 ++ 5 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs create mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs create mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs diff --git a/build/common.props b/build/common.props index 66c94d835..9967ba4a6 100644 --- a/build/common.props +++ b/build/common.props @@ -1,7 +1,7 @@ - 3.0.34$(VersionSuffix) + 3.0.34-target-scale$(VersionSuffix) 5.0.0-beta.2$(VersionSuffix) 4.0.3$(VersionSuffix) diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs index 48eb866aa..e6087a09d 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs @@ -88,6 +88,8 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); + services.AddSingleton(); services.AddSingleton(); diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs new file mode 100644 index 000000000..5f0269729 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs @@ -0,0 +1,246 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + internal class DynamicTargetValueProvider : IDynamicTargetValueProvider + { + private const int DynamicConcurrencyStabilizationTimeInSeconds = 60; //default value in the scale controller + private const int CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds = 60; //default value in the scale controller + private const int CacheExpirationGracePeriodInSeconds = 30; //default value in the scale controller + private const int DynamicConcurrencyStabilizationRange = 1; //default value in the scale controller + + private IConcurrencyStatusRepository _concurrencyStatusRepository; + private DateTime lastFunctionSnapshotCacheUpdateTime; + private ConcurrentDictionary dynamicConcurrencyFunctionCache; + private SemaphoreSlim dynamicConcurrencyCacheUpdateLock; + + public DynamicTargetValueProvider(IConcurrencyStatusRepository concurrencyStatusRepository) + { + _concurrencyStatusRepository = concurrencyStatusRepository; + lastFunctionSnapshotCacheUpdateTime = DateTime.MinValue; + dynamicConcurrencyFunctionCache = new ConcurrentDictionary(); + this.dynamicConcurrencyCacheUpdateLock = new SemaphoreSlim(1); + } + + public async Task GetDynamicTargetValue(string functionId, bool isDynamicConcurrencyEnabled) + { + int fallbackValue = -1; + if (isDynamicConcurrencyEnabled) + { + DateTime currentTime = DateTime.UtcNow; + FunctionSnapshotCacheEntry functionSnapshotCacheEntry; + if (IsCachedMetricValueValid(functionId, currentTime, out functionSnapshotCacheEntry)) + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Using the cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} for function {functionName}."); + return functionSnapshotCacheEntry.MetricValue; + } + + if (IsCachedMetricValidFallback(functionSnapshotCacheEntry, currentTime)) + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} is valid as a fallback for function {functionName}."); + fallbackValue = functionSnapshotCacheEntry.MetricValue; + } + + if (_concurrencyStatusRepository == null) + { + if (dynamicConcurrencyCacheUpdateLock.Wait(0)) + { + try + { + if ((currentTime - lastFunctionSnapshotCacheUpdateTime).TotalSeconds < DynamicConcurrencyStabilizationTimeInSeconds) + { + //Only want each site to read from blob storage once every DynamicConcurrencyStabilizationTimeInSeconds + //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Site {site.Name} has already read from blob storage for dynamic concurrency value within the past {config.DynamicConcurrencyStabilizationTimeInSeconds} seconds. Last access to blob storage was at {lastFunctionSnapshotCacheUpdateTime}."); + return fallbackValue; + } + + //DynamicConcurrencyStatusBlob dynamicConcurrencyDataJson = await ParseJSONFromDynamicConcurrencyBlobAsync(); + HostConcurrencySnapshot hostConcurrencySnapshot = await _concurrencyStatusRepository.ReadAsync(CancellationToken.None); + + if (hostConcurrencySnapshot == null) + { + //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Unable to deserialize dynamic concurrency status blob."); + return fallbackValue; + } + + UpdateCacheForAllFunctions(hostConcurrencySnapshot, currentTime); + lastFunctionSnapshotCacheUpdateTime = currentTime; + + FunctionSnapshotCacheEntry newCacheEntry; + if (GetCachedFunctionSnapshotForFunction(functionId, out newCacheEntry) && newCacheEntry.MetricIsStable) + { + //ExtensionLogEventSource.Log.SiteInformation(context.AppName, $"Using stabilized value of {newCacheEntry.MetricValue} for function {functionName}"); + return newCacheEntry.MetricValue; + } + else + { + return fallbackValue; + } + } + catch // (Exception e) + { + //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Unable to read from the blob where dynamic concurrency data is stored for site {site.Name} due to exception {e}."); + } + finally + { + dynamicConcurrencyCacheUpdateLock.Release(); + } + } + } + else + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Target Based Scaling Algorithm was unable to connect to the blob where dynamic concurrency data is stored for site {site.Name} upon instantiation."); + } + } + return fallbackValue; + + + // Returning fallback value since Dynamic Concurrency is not enabled + // fallbackValue can either be user-defined, or the default value for service bus and eventhub + //bool isDefault = fallbackValue == DefaultTargetEventHubMetric || fallbackValue == DefaultTargetServiceBusMetric; + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Using {(isDefault ? "default" : "user-defined")} desiredMetricValue of {fallbackValue} for {triggerName} trigger"); + //return context.StaticTargetValue; + + //int desiredWorkerCount = (int)Math.Ceiling((decimal)targetContext.Metrics.Last().MessageCount / desiredMetricValue); + + //ExtensionLogEventSource.Log.SiteScaleVote(site.Name, queueTriggerId, scaleResult.ToString(), scaleReason); + //return desiredWorkerCount - context.WorkerCount; + } + + internal bool IsCachedMetricValidFallback(FunctionSnapshotCacheEntry cacheEntry, DateTime currentTime) + { + if (cacheEntry == null) + { + return false; + } + + if (!cacheEntry.MetricIsStable) + { + return false; + } + + double secondsExpired = (currentTime - cacheEntry.MetricCachedAt).TotalSeconds - CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds; + + if (secondsExpired > CacheExpirationGracePeriodInSeconds) + { + return false; + } + + return true; + } + + internal bool IsCachedMetricValueValid(string functionName, DateTime currentTime, out FunctionSnapshotCacheEntry functionSnapshotCacheEntry) + { + if (dynamicConcurrencyFunctionCache.TryGetValue(functionName, out functionSnapshotCacheEntry)) + { + if (!functionSnapshotCacheEntry.MetricIsStable) + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} is not stable for function {functionName}. " + + //$"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); + return false; + } + + if ((currentTime - functionSnapshotCacheEntry.MetricCachedAt).TotalSeconds > CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds) + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue {functionSnapshotCacheEntry.MetricValue} has expired for function {functionName}. " + + //$"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); + return false; + } + return true; + } + else + { + //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"No cached desiredMetricValue exists for function {functionName}. " + + // $"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); + return false; + } + } + + internal void UpdateSnapshotCacheForFunction(string functionName, int desiredMetricValue, DateTime currentTime, bool isStable) + { + FunctionSnapshotCacheEntry newFunctionSnapshot = new FunctionSnapshotCacheEntry(desiredMetricValue, currentTime, isStable); + + // In the time that this thread was running, another thread could have updated the dictionary with a more recently pulled desiredMetricValue. In that case, we don't want to + // populate the dictionary with our value, and in either case we just return whatever value is currently in the direction (post-population or not) + dynamicConcurrencyFunctionCache.AddOrUpdate(functionName, newFunctionSnapshot, + (key, currentFunctionSnapshot) => + currentFunctionSnapshot.MetricCachedAt < currentTime ? newFunctionSnapshot : currentFunctionSnapshot); + } + + internal void UpdateCacheForAllFunctions(HostConcurrencySnapshot dcBlob, DateTime currentTime) + { + if (dcBlob.FunctionSnapshots != null) + { + foreach (var fullFunctionName in dcBlob.FunctionSnapshots.Keys) + { + if (dcBlob.FunctionSnapshots.TryGetValue(fullFunctionName, out FunctionConcurrencySnapshot functionSnapshot)) + { + int newDesiredMetricValue = functionSnapshot.Concurrency; + string functionName = GetFunctionName(fullFunctionName); + bool isStable = IsNewMetricValueStableForFunction(functionName, newDesiredMetricValue); + UpdateSnapshotCacheForFunction(functionName, newDesiredMetricValue, currentTime, isStable); + } + } + } + } + + internal bool GetCachedFunctionSnapshotForFunction(string functionName, out FunctionSnapshotCacheEntry functionSnapshotCacheEntry) + { + return dynamicConcurrencyFunctionCache.TryGetValue(functionName, out functionSnapshotCacheEntry); + } + + internal bool IsNewMetricValueStableForFunction(string functionName, int newMetricValue) + { + if (newMetricValue < 0) + { + return false; + } + + if (!GetCachedFunctionSnapshotForFunction(functionName, out FunctionSnapshotCacheEntry oldEntry)) + { + return false; + } + + int oldMetricValue = oldEntry.MetricValue; + if (newMetricValue <= oldMetricValue + DynamicConcurrencyStabilizationRange && + newMetricValue >= oldMetricValue - DynamicConcurrencyStabilizationRange) + { + return true; + } + return false; + } + + internal static string GetFunctionName(string fullFunctionName) + { + if (string.IsNullOrEmpty(fullFunctionName)) + { + return ""; + } + return fullFunctionName.Split('.').Last(); + } + } +} + +internal class FunctionSnapshotCacheEntry +{ + internal int MetricValue { get; set; } + + internal DateTime MetricCachedAt { get; set; } + + internal bool MetricIsStable { get; set; } + + + public FunctionSnapshotCacheEntry(int metricValue, DateTime metricCachedAt, bool metricIsStable) + { + MetricValue = metricValue; + MetricCachedAt = metricCachedAt; + MetricIsStable = metricIsStable; + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs new file mode 100644 index 000000000..665d11b60 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs @@ -0,0 +1,13 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.Host.Executors; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + public interface IDynamicTargetValueProvider + { + Task GetDynamicTargetValue(string functionId, bool isDynamicConcurrencyEnabled); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs new file mode 100644 index 000000000..f56ab2b0e --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs @@ -0,0 +1,18 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + public interface ITargetScaleMonitor : IScaleMonitor + { + Task GetScaleVoteAsync(ScaleStatusContext context); + } + + public interface ITargetScaleMonitor : IScaleMonitor where TMetrics : ScaleMetrics + { + new Task GetMetricsAsync(); + + Task GetScaleVoteAsync(ScaleStatusContext context); + } +} From e37d5bf17d6732e737c4adeb1c96baf932755665 Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Fri, 29 Jul 2022 14:38:21 -0700 Subject: [PATCH 4/8] Fix ITargetScaleMonitor --- src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs index f56ab2b0e..ac2711069 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs @@ -9,7 +9,7 @@ public interface ITargetScaleMonitor : IScaleMonitor Task GetScaleVoteAsync(ScaleStatusContext context); } - public interface ITargetScaleMonitor : IScaleMonitor where TMetrics : ScaleMetrics + public interface ITargetScaleMonitor : ITargetScaleMonitor where TMetrics : ScaleMetrics { new Task GetMetricsAsync(); From a64bd5641e315374380c456f04d1f810adf882b9 Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Thu, 25 Aug 2022 18:03:54 -0700 Subject: [PATCH 5/8] Few fixes and adding tests. --- .../WebJobsServiceCollectionExtensions.cs | 2 +- .../Scale/DynamicTargetValueProvider.cs | 265 ++++-------------- .../Scale/IDynamicTargetValueProvider.cs | 2 +- .../Scale/DynamicTargetValueProviderTest.cs | 93 ++++++ 4 files changed, 156 insertions(+), 206 deletions(-) create mode 100644 test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/DynamicTargetValueProviderTest.cs diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs index e6087a09d..b646268f9 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs @@ -88,7 +88,6 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); - services.TryAddSingleton(); services.AddSingleton(); @@ -131,6 +130,7 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.TryAddEnumerable(ServiceDescriptor.Singleton()); services.TryAddSingleton(); services.TryAddEnumerable(ServiceDescriptor.Singleton()); + services.TryAddSingleton(); services.ConfigureOptions(); services.ConfigureOptions(); diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs index 5f0269729..f66d14e7f 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs @@ -1,9 +1,9 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; using System; -using System.Collections.Concurrent; -using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -11,236 +11,93 @@ namespace Microsoft.Azure.WebJobs.Host.Scale { internal class DynamicTargetValueProvider : IDynamicTargetValueProvider { - private const int DynamicConcurrencyStabilizationTimeInSeconds = 60; //default value in the scale controller - private const int CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds = 60; //default value in the scale controller - private const int CacheExpirationGracePeriodInSeconds = 30; //default value in the scale controller - private const int DynamicConcurrencyStabilizationRange = 1; //default value in the scale controller - + private TimeSpan _cachedInterval = TimeSpan.FromSeconds(30); + private TimeSpan _expiredSnapshotInterval = TimeSpan.FromSeconds(60); + private DateTime _lastSnapshotRead = DateTime.MinValue; + private HostConcurrencySnapshot _lastSnapshot; private IConcurrencyStatusRepository _concurrencyStatusRepository; - private DateTime lastFunctionSnapshotCacheUpdateTime; - private ConcurrentDictionary dynamicConcurrencyFunctionCache; - private SemaphoreSlim dynamicConcurrencyCacheUpdateLock; + private readonly ILogger _logger; + static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); - public DynamicTargetValueProvider(IConcurrencyStatusRepository concurrencyStatusRepository) + public DynamicTargetValueProvider(IConcurrencyStatusRepository concurrencyStatusRepository, ILoggerFactory loggerFactory) { _concurrencyStatusRepository = concurrencyStatusRepository; - lastFunctionSnapshotCacheUpdateTime = DateTime.MinValue; - dynamicConcurrencyFunctionCache = new ConcurrentDictionary(); - this.dynamicConcurrencyCacheUpdateLock = new SemaphoreSlim(1); + _logger = loggerFactory.CreateLogger(); } - public async Task GetDynamicTargetValue(string functionId, bool isDynamicConcurrencyEnabled) + // for tests + internal DynamicTargetValueProvider(IConcurrencyStatusRepository concurrencyStatusRepository, ILoggerFactory loggerFactory, + TimeSpan cachedInterval, TimeSpan expiredSnapshotInterval) : this(concurrencyStatusRepository, loggerFactory) { - int fallbackValue = -1; - if (isDynamicConcurrencyEnabled) - { - DateTime currentTime = DateTime.UtcNow; - FunctionSnapshotCacheEntry functionSnapshotCacheEntry; - if (IsCachedMetricValueValid(functionId, currentTime, out functionSnapshotCacheEntry)) - { - //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Using the cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} for function {functionName}."); - return functionSnapshotCacheEntry.MetricValue; - } - - if (IsCachedMetricValidFallback(functionSnapshotCacheEntry, currentTime)) - { - //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} is valid as a fallback for function {functionName}."); - fallbackValue = functionSnapshotCacheEntry.MetricValue; - } - - if (_concurrencyStatusRepository == null) - { - if (dynamicConcurrencyCacheUpdateLock.Wait(0)) - { - try - { - if ((currentTime - lastFunctionSnapshotCacheUpdateTime).TotalSeconds < DynamicConcurrencyStabilizationTimeInSeconds) - { - //Only want each site to read from blob storage once every DynamicConcurrencyStabilizationTimeInSeconds - //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Site {site.Name} has already read from blob storage for dynamic concurrency value within the past {config.DynamicConcurrencyStabilizationTimeInSeconds} seconds. Last access to blob storage was at {lastFunctionSnapshotCacheUpdateTime}."); - return fallbackValue; - } - - //DynamicConcurrencyStatusBlob dynamicConcurrencyDataJson = await ParseJSONFromDynamicConcurrencyBlobAsync(); - HostConcurrencySnapshot hostConcurrencySnapshot = await _concurrencyStatusRepository.ReadAsync(CancellationToken.None); - - if (hostConcurrencySnapshot == null) - { - //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Unable to deserialize dynamic concurrency status blob."); - return fallbackValue; - } - - UpdateCacheForAllFunctions(hostConcurrencySnapshot, currentTime); - lastFunctionSnapshotCacheUpdateTime = currentTime; - - FunctionSnapshotCacheEntry newCacheEntry; - if (GetCachedFunctionSnapshotForFunction(functionId, out newCacheEntry) && newCacheEntry.MetricIsStable) - { - //ExtensionLogEventSource.Log.SiteInformation(context.AppName, $"Using stabilized value of {newCacheEntry.MetricValue} for function {functionName}"); - return newCacheEntry.MetricValue; - } - else - { - return fallbackValue; - } - } - catch // (Exception e) - { - //ExtensionLogEventSource.Log.SiteWarning(site.Name, $"Unable to read from the blob where dynamic concurrency data is stored for site {site.Name} due to exception {e}."); - } - finally - { - dynamicConcurrencyCacheUpdateLock.Release(); - } - } - } - else - { - //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Target Based Scaling Algorithm was unable to connect to the blob where dynamic concurrency data is stored for site {site.Name} upon instantiation."); - } - } - return fallbackValue; - - - // Returning fallback value since Dynamic Concurrency is not enabled - // fallbackValue can either be user-defined, or the default value for service bus and eventhub - //bool isDefault = fallbackValue == DefaultTargetEventHubMetric || fallbackValue == DefaultTargetServiceBusMetric; - //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Using {(isDefault ? "default" : "user-defined")} desiredMetricValue of {fallbackValue} for {triggerName} trigger"); - //return context.StaticTargetValue; - - //int desiredWorkerCount = (int)Math.Ceiling((decimal)targetContext.Metrics.Last().MessageCount / desiredMetricValue); - - //ExtensionLogEventSource.Log.SiteScaleVote(site.Name, queueTriggerId, scaleResult.ToString(), scaleReason); - //return desiredWorkerCount - context.WorkerCount; + _cachedInterval = cachedInterval; + _expiredSnapshotInterval = expiredSnapshotInterval; } - internal bool IsCachedMetricValidFallback(FunctionSnapshotCacheEntry cacheEntry, DateTime currentTime) + // for tests + public IConcurrencyStatusRepository ConcurrencyStatusRepository { - if (cacheEntry == null) - { - return false; - } - - if (!cacheEntry.MetricIsStable) - { - return false; - } - - double secondsExpired = (currentTime - cacheEntry.MetricCachedAt).TotalSeconds - CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds; - - if (secondsExpired > CacheExpirationGracePeriodInSeconds) - { - return false; - } - - return true; + set => _concurrencyStatusRepository = value; } - internal bool IsCachedMetricValueValid(string functionName, DateTime currentTime, out FunctionSnapshotCacheEntry functionSnapshotCacheEntry) + // for tests + public DateTime LastSnapshotRead { - if (dynamicConcurrencyFunctionCache.TryGetValue(functionName, out functionSnapshotCacheEntry)) - { - if (!functionSnapshotCacheEntry.MetricIsStable) - { - //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue of {functionSnapshotCacheEntry.MetricValue} is not stable for function {functionName}. " + - //$"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); - return false; - } - - if ((currentTime - functionSnapshotCacheEntry.MetricCachedAt).TotalSeconds > CacheExpirationTimeDynamicConcurrencyDesiredMetricValueInSeconds) - { - //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"Cached desiredMetricValue {functionSnapshotCacheEntry.MetricValue} has expired for function {functionName}. " + - //$"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); - return false; - } - return true; - } - else - { - //ExtensionLogEventSource.Log.SiteInformation(site.Name, $"No cached desiredMetricValue exists for function {functionName}. " + - // $"Attempting to read a new dynamic concurrency desiredMetricValue from blob storage."); - return false; - } + set => _lastSnapshotRead = value; } - internal void UpdateSnapshotCacheForFunction(string functionName, int desiredMetricValue, DateTime currentTime, bool isStable) + public async Task GetDynamicTargetValueAsync(string functionId) { - FunctionSnapshotCacheEntry newFunctionSnapshot = new FunctionSnapshotCacheEntry(desiredMetricValue, currentTime, isStable); + DateTime now = DateTime.UtcNow; + int fallback = -1; // default fallback - // In the time that this thread was running, another thread could have updated the dictionary with a more recently pulled desiredMetricValue. In that case, we don't want to - // populate the dictionary with our value, and in either case we just return whatever value is currently in the direction (post-population or not) - dynamicConcurrencyFunctionCache.AddOrUpdate(functionName, newFunctionSnapshot, - (key, currentFunctionSnapshot) => - currentFunctionSnapshot.MetricCachedAt < currentTime ? newFunctionSnapshot : currentFunctionSnapshot); - } - - internal void UpdateCacheForAllFunctions(HostConcurrencySnapshot dcBlob, DateTime currentTime) - { - if (dcBlob.FunctionSnapshots != null) + _logger.LogDebug($"Getting dynamic target value for function '{functionId}'"); + await _semaphoreSlim.WaitAsync(); + try { - foreach (var fullFunctionName in dcBlob.FunctionSnapshots.Keys) + if (_concurrencyStatusRepository == null) + { + _logger.LogDebug($"Returning fallback. Snapshot repository does not exists."); + } + else { - if (dcBlob.FunctionSnapshots.TryGetValue(fullFunctionName, out FunctionConcurrencySnapshot functionSnapshot)) + // Update host snaphsot if cache is expired or snapshot is expired + if (now - _lastSnapshotRead > _cachedInterval) { - int newDesiredMetricValue = functionSnapshot.Concurrency; - string functionName = GetFunctionName(fullFunctionName); - bool isStable = IsNewMetricValueStableForFunction(functionName, newDesiredMetricValue); - UpdateSnapshotCacheForFunction(functionName, newDesiredMetricValue, currentTime, isStable); + _lastSnapshotRead = now; + _lastSnapshot = await _concurrencyStatusRepository.ReadAsync(CancellationToken.None); // TODO: CancellationToken.None + _logger.LogDebug($"Snapshot is updated: '{JsonConvert.SerializeObject(_lastSnapshot)}'"); } - } - } - } - internal bool GetCachedFunctionSnapshotForFunction(string functionName, out FunctionSnapshotCacheEntry functionSnapshotCacheEntry) - { - return dynamicConcurrencyFunctionCache.TryGetValue(functionName, out functionSnapshotCacheEntry); - } - - internal bool IsNewMetricValueStableForFunction(string functionName, int newMetricValue) - { - if (newMetricValue < 0) - { - return false; + // Return value from + if (_lastSnapshot != null && now - _lastSnapshot.Timestamp < _expiredSnapshotInterval) + { + if (_lastSnapshot.FunctionSnapshots.TryGetValue(functionId, out FunctionConcurrencySnapshot functionConcurrencySnapshot)) + { + _logger.LogDebug($"Returning '{functionConcurrencySnapshot.Concurrency}' as target value for function '{functionId}'"); + return functionConcurrencySnapshot.Concurrency; + } + else + { + _logger.LogDebug($"Returning fallback. Snapshot for the function '{functionId}' is not found."); + } + } + else + { + _logger.LogDebug($"Returning fallback. Snapshot for the host is expired or not found."); + } + } } - - if (!GetCachedFunctionSnapshotForFunction(functionName, out FunctionSnapshotCacheEntry oldEntry)) + catch (Exception ex) { - return false; + // in case of an exception return fallback + _logger.LogError($"Returning fallback. An exception was occured during reading dynamic target value for the funciton '{functionId}' due to exception {ex}."); } - - int oldMetricValue = oldEntry.MetricValue; - if (newMetricValue <= oldMetricValue + DynamicConcurrencyStabilizationRange && - newMetricValue >= oldMetricValue - DynamicConcurrencyStabilizationRange) + finally { - return true; + _semaphoreSlim.Release(); } - return false; - } - internal static string GetFunctionName(string fullFunctionName) - { - if (string.IsNullOrEmpty(fullFunctionName)) - { - return ""; - } - return fullFunctionName.Split('.').Last(); + return fallback; } } -} - -internal class FunctionSnapshotCacheEntry -{ - internal int MetricValue { get; set; } - - internal DateTime MetricCachedAt { get; set; } - - internal bool MetricIsStable { get; set; } - - - public FunctionSnapshotCacheEntry(int metricValue, DateTime metricCachedAt, bool metricIsStable) - { - MetricValue = metricValue; - MetricCachedAt = metricCachedAt; - MetricIsStable = metricIsStable; - } } \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs index 665d11b60..331afc979 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs @@ -8,6 +8,6 @@ namespace Microsoft.Azure.WebJobs.Host.Scale { public interface IDynamicTargetValueProvider { - Task GetDynamicTargetValue(string functionId, bool isDynamicConcurrencyEnabled); + Task GetDynamicTargetValueAsync(string functionId); } } diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/DynamicTargetValueProviderTest.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/DynamicTargetValueProviderTest.cs new file mode 100644 index 000000000..db35e65ee --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/DynamicTargetValueProviderTest.cs @@ -0,0 +1,93 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.Host.TestCommon; +using System.Threading.Tasks; +using System; +using Xunit; +using Microsoft.Azure.WebJobs.Host.Scale; +using Moq; +using Microsoft.Extensions.Primitives; +using System.Threading; +using System.Collections.Generic; +using Azure.Core; +using Microsoft.Extensions.Logging; +using System.Linq; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + [Trait(TestTraits.CategoryTraitName, TestTraits.DynamicConcurrency)] + public class DynamicTargetValueProviderTest + { + private readonly ILoggerFactory _loggerFactory; + private readonly TestLoggerProvider _loggerProvier; + + public DynamicTargetValueProviderTest() + { + _loggerFactory = new LoggerFactory(); + _loggerProvier = new TestLoggerProvider(); + _loggerFactory.AddProvider(_loggerProvier); + } + + [Fact] + public async Task GetDynamicTargetValue_NoRepository() + { + DynamicTargetValueProvider dynamicTargetValueProvider = new DynamicTargetValueProvider(null, _loggerFactory); + int result = await dynamicTargetValueProvider.GetDynamicTargetValueAsync("func1"); + Assert.Contains("Returning fallback. Snapshot repository does not exists.", _loggerProvier.GetAllLogMessages().Select(x => x.FormattedMessage)); + Assert.Equal(result, -1); + } + + [Theory] + [InlineData(60, 60, "func1", 1)] // getting value from cache + [InlineData(60, 45, "func1", 1)] // getting value from cache + [InlineData(60, 35, "func1", -1)] // snapshot is expired + [InlineData(20, 60, "func1", 4)] // getting value from last snapshot + [InlineData(60, 60, "func4", -1)] // funciton is not found in the spapshot + public async Task GetDynamicTargetValue_ReturnsExpected( + int cachedIntervalInSec, + int expiredSnapshotIntervalInSec, + string functionName, + int expectedConcurrency) + { + // Set last snapshot + IConcurrencyStatusRepository repo = CreateConcurrencyStatusRepository(DateTime.Now.AddSeconds(-40), + new string[] { "func1", "func2", "func3" }, + new int[] { 1, 2, 3 }); + DynamicTargetValueProvider dynamicTargetValueProvider = new DynamicTargetValueProvider(repo, _loggerFactory, + TimeSpan.FromSeconds(cachedIntervalInSec), TimeSpan.FromSeconds(expiredSnapshotIntervalInSec)); + await dynamicTargetValueProvider.GetDynamicTargetValueAsync(functionName); + dynamicTargetValueProvider.LastSnapshotRead = DateTime.Now.AddSeconds(-30); + + DateTime now = DateTime.Now; + + repo = CreateConcurrencyStatusRepository(DateTime.Now.AddSeconds(-20), + new string[] { "func1", "func2", "func3" }, + new int[] { 4, 5, 6 }); + dynamicTargetValueProvider.ConcurrencyStatusRepository = repo; + int result = await dynamicTargetValueProvider.GetDynamicTargetValueAsync(functionName); + + var logs = _loggerProvier.GetAllLogMessages().ToArray(); + Assert.Equal(result, expectedConcurrency); + } + + private IConcurrencyStatusRepository CreateConcurrencyStatusRepository(DateTime timestamp, string[] functionNames, int[] concurrecnies) + { + HostConcurrencySnapshot hostConcurrencySnapshot = new HostConcurrencySnapshot() + { + Timestamp = timestamp + }; + + var functionSnapshots = new Dictionary(); + for (int i = 0; i < functionNames.Length; i++) + { + functionSnapshots[functionNames[i]] = new FunctionConcurrencySnapshot() { Concurrency = concurrecnies[i] }; + } + hostConcurrencySnapshot.FunctionSnapshots = functionSnapshots; + + Mock mockoncurrencyStatusRepository = new Mock(); + mockoncurrencyStatusRepository.Setup(x => x.ReadAsync(It.IsAny())).ReturnsAsync(hostConcurrencySnapshot); + return mockoncurrencyStatusRepository.Object; + } + } +} From b918973e63d6787d884c60d3b7703d4c3f9e47cb Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Thu, 25 Aug 2022 18:08:00 -0700 Subject: [PATCH 6/8] Bumping version --- build/common.props | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/common.props b/build/common.props index 9967ba4a6..a13b7fd1b 100644 --- a/build/common.props +++ b/build/common.props @@ -1,7 +1,7 @@ - 3.0.34-target-scale$(VersionSuffix) + 3.0.35$(VersionSuffix) 5.0.0-beta.2$(VersionSuffix) 4.0.3$(VersionSuffix) From f858e85ca7e71db71fc4eaca1515d55e48bbbceb Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Mon, 29 Aug 2022 11:33:35 -0700 Subject: [PATCH 7/8] Adding public types --- .../PublicSurfaceTests.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs index 27021688d..55df3a426 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs @@ -297,7 +297,10 @@ public void WebJobs_Host_VerifyPublicSurfaceArea() "FunctionActivityStatus", "IFunctionActivityStatusProvider", "SupportsRetryAttribute", - "AppServicesHostingUtility" + "AppServicesHostingUtility", + "IDynamicTargetValueProvider", + "ITargetScaleMonitor", + "ITargetScaleMonitor`1" }; TestHelpers.AssertPublicTypes(expected, assembly); From 4d1a5da78a2c2792272f9018e8e62ea9bfc2dee8 Mon Sep 17 00:00:00 2001 From: Alexey Rodionov Date: Wed, 7 Sep 2022 14:29:22 -0700 Subject: [PATCH 8/8] Fixing comments --- build/common.props | 2 +- .../WebJobsServiceCollectionExtensions.cs | 2 - .../Scale/DynamicTargetValueProvider.cs | 103 ------------------ .../Scale/IDynamicTargetValueProvider.cs | 13 --- .../Scale/ITargetScaleMonitor.cs | 18 --- .../Scale/ScaleStatus.cs | 5 + .../PublicSurfaceTests.cs | 5 +- .../Scale/DynamicTargetValueProviderTest.cs | 93 ---------------- 8 files changed, 7 insertions(+), 234 deletions(-) delete mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs delete mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs delete mode 100644 src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs delete mode 100644 test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/DynamicTargetValueProviderTest.cs diff --git a/build/common.props b/build/common.props index a13b7fd1b..66c94d835 100644 --- a/build/common.props +++ b/build/common.props @@ -1,7 +1,7 @@ - 3.0.35$(VersionSuffix) + 3.0.34$(VersionSuffix) 5.0.0-beta.2$(VersionSuffix) 4.0.3$(VersionSuffix) diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs index b646268f9..48eb866aa 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs @@ -89,7 +89,6 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.TryAddSingleton(); services.TryAddSingleton(); - services.AddSingleton(); services.AddSingleton(); @@ -130,7 +129,6 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.TryAddEnumerable(ServiceDescriptor.Singleton()); services.TryAddSingleton(); services.TryAddEnumerable(ServiceDescriptor.Singleton()); - services.TryAddSingleton(); services.ConfigureOptions(); services.ConfigureOptions(); diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs deleted file mode 100644 index f66d14e7f..000000000 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/DynamicTargetValueProvider.cs +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using Microsoft.Extensions.Logging; -using Newtonsoft.Json; -using System; -using System.Threading; -using System.Threading.Tasks; - -namespace Microsoft.Azure.WebJobs.Host.Scale -{ - internal class DynamicTargetValueProvider : IDynamicTargetValueProvider - { - private TimeSpan _cachedInterval = TimeSpan.FromSeconds(30); - private TimeSpan _expiredSnapshotInterval = TimeSpan.FromSeconds(60); - private DateTime _lastSnapshotRead = DateTime.MinValue; - private HostConcurrencySnapshot _lastSnapshot; - private IConcurrencyStatusRepository _concurrencyStatusRepository; - private readonly ILogger _logger; - static SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); - - public DynamicTargetValueProvider(IConcurrencyStatusRepository concurrencyStatusRepository, ILoggerFactory loggerFactory) - { - _concurrencyStatusRepository = concurrencyStatusRepository; - _logger = loggerFactory.CreateLogger(); - } - - // for tests - internal DynamicTargetValueProvider(IConcurrencyStatusRepository concurrencyStatusRepository, ILoggerFactory loggerFactory, - TimeSpan cachedInterval, TimeSpan expiredSnapshotInterval) : this(concurrencyStatusRepository, loggerFactory) - { - _cachedInterval = cachedInterval; - _expiredSnapshotInterval = expiredSnapshotInterval; - } - - // for tests - public IConcurrencyStatusRepository ConcurrencyStatusRepository - { - set => _concurrencyStatusRepository = value; - } - - // for tests - public DateTime LastSnapshotRead - { - set => _lastSnapshotRead = value; - } - - public async Task GetDynamicTargetValueAsync(string functionId) - { - DateTime now = DateTime.UtcNow; - int fallback = -1; // default fallback - - _logger.LogDebug($"Getting dynamic target value for function '{functionId}'"); - await _semaphoreSlim.WaitAsync(); - try - { - if (_concurrencyStatusRepository == null) - { - _logger.LogDebug($"Returning fallback. Snapshot repository does not exists."); - } - else - { - // Update host snaphsot if cache is expired or snapshot is expired - if (now - _lastSnapshotRead > _cachedInterval) - { - _lastSnapshotRead = now; - _lastSnapshot = await _concurrencyStatusRepository.ReadAsync(CancellationToken.None); // TODO: CancellationToken.None - _logger.LogDebug($"Snapshot is updated: '{JsonConvert.SerializeObject(_lastSnapshot)}'"); - } - - // Return value from - if (_lastSnapshot != null && now - _lastSnapshot.Timestamp < _expiredSnapshotInterval) - { - if (_lastSnapshot.FunctionSnapshots.TryGetValue(functionId, out FunctionConcurrencySnapshot functionConcurrencySnapshot)) - { - _logger.LogDebug($"Returning '{functionConcurrencySnapshot.Concurrency}' as target value for function '{functionId}'"); - return functionConcurrencySnapshot.Concurrency; - } - else - { - _logger.LogDebug($"Returning fallback. Snapshot for the function '{functionId}' is not found."); - } - } - else - { - _logger.LogDebug($"Returning fallback. Snapshot for the host is expired or not found."); - } - } - } - catch (Exception ex) - { - // in case of an exception return fallback - _logger.LogError($"Returning fallback. An exception was occured during reading dynamic target value for the funciton '{functionId}' due to exception {ex}."); - } - finally - { - _semaphoreSlim.Release(); - } - - return fallback; - } - } -} \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs deleted file mode 100644 index 331afc979..000000000 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/IDynamicTargetValueProvider.cs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using Microsoft.Azure.WebJobs.Host.Executors; -using System.Threading.Tasks; - -namespace Microsoft.Azure.WebJobs.Host.Scale -{ - public interface IDynamicTargetValueProvider - { - Task GetDynamicTargetValueAsync(string functionId); - } -} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs deleted file mode 100644 index ac2711069..000000000 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/ITargetScaleMonitor.cs +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. -using System.Threading.Tasks; - -namespace Microsoft.Azure.WebJobs.Host.Scale -{ - public interface ITargetScaleMonitor : IScaleMonitor - { - Task GetScaleVoteAsync(ScaleStatusContext context); - } - - public interface ITargetScaleMonitor : ITargetScaleMonitor where TMetrics : ScaleMetrics - { - new Task GetMetricsAsync(); - - Task GetScaleVoteAsync(ScaleStatusContext context); - } -} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleStatus.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleStatus.cs index f66bb3e65..ba7aee346 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleStatus.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleStatus.cs @@ -12,5 +12,10 @@ public class ScaleStatus /// Gets or sets the current scale decision. /// public ScaleVote Vote { get; set; } + + /// + /// Get or sets target worker count. + /// + public int? TargetWorkerCount { get; set; } } } diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs index 55df3a426..27021688d 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs @@ -297,10 +297,7 @@ public void WebJobs_Host_VerifyPublicSurfaceArea() "FunctionActivityStatus", "IFunctionActivityStatusProvider", "SupportsRetryAttribute", - "AppServicesHostingUtility", - "IDynamicTargetValueProvider", - "ITargetScaleMonitor", - "ITargetScaleMonitor`1" + "AppServicesHostingUtility" }; TestHelpers.AssertPublicTypes(expected, assembly); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/DynamicTargetValueProviderTest.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/DynamicTargetValueProviderTest.cs deleted file mode 100644 index db35e65ee..000000000 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/DynamicTargetValueProviderTest.cs +++ /dev/null @@ -1,93 +0,0 @@ -// Copyright (c) .NET Foundation. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using Microsoft.Azure.WebJobs.Host.TestCommon; -using System.Threading.Tasks; -using System; -using Xunit; -using Microsoft.Azure.WebJobs.Host.Scale; -using Moq; -using Microsoft.Extensions.Primitives; -using System.Threading; -using System.Collections.Generic; -using Azure.Core; -using Microsoft.Extensions.Logging; -using System.Linq; - -namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale -{ - [Trait(TestTraits.CategoryTraitName, TestTraits.DynamicConcurrency)] - public class DynamicTargetValueProviderTest - { - private readonly ILoggerFactory _loggerFactory; - private readonly TestLoggerProvider _loggerProvier; - - public DynamicTargetValueProviderTest() - { - _loggerFactory = new LoggerFactory(); - _loggerProvier = new TestLoggerProvider(); - _loggerFactory.AddProvider(_loggerProvier); - } - - [Fact] - public async Task GetDynamicTargetValue_NoRepository() - { - DynamicTargetValueProvider dynamicTargetValueProvider = new DynamicTargetValueProvider(null, _loggerFactory); - int result = await dynamicTargetValueProvider.GetDynamicTargetValueAsync("func1"); - Assert.Contains("Returning fallback. Snapshot repository does not exists.", _loggerProvier.GetAllLogMessages().Select(x => x.FormattedMessage)); - Assert.Equal(result, -1); - } - - [Theory] - [InlineData(60, 60, "func1", 1)] // getting value from cache - [InlineData(60, 45, "func1", 1)] // getting value from cache - [InlineData(60, 35, "func1", -1)] // snapshot is expired - [InlineData(20, 60, "func1", 4)] // getting value from last snapshot - [InlineData(60, 60, "func4", -1)] // funciton is not found in the spapshot - public async Task GetDynamicTargetValue_ReturnsExpected( - int cachedIntervalInSec, - int expiredSnapshotIntervalInSec, - string functionName, - int expectedConcurrency) - { - // Set last snapshot - IConcurrencyStatusRepository repo = CreateConcurrencyStatusRepository(DateTime.Now.AddSeconds(-40), - new string[] { "func1", "func2", "func3" }, - new int[] { 1, 2, 3 }); - DynamicTargetValueProvider dynamicTargetValueProvider = new DynamicTargetValueProvider(repo, _loggerFactory, - TimeSpan.FromSeconds(cachedIntervalInSec), TimeSpan.FromSeconds(expiredSnapshotIntervalInSec)); - await dynamicTargetValueProvider.GetDynamicTargetValueAsync(functionName); - dynamicTargetValueProvider.LastSnapshotRead = DateTime.Now.AddSeconds(-30); - - DateTime now = DateTime.Now; - - repo = CreateConcurrencyStatusRepository(DateTime.Now.AddSeconds(-20), - new string[] { "func1", "func2", "func3" }, - new int[] { 4, 5, 6 }); - dynamicTargetValueProvider.ConcurrencyStatusRepository = repo; - int result = await dynamicTargetValueProvider.GetDynamicTargetValueAsync(functionName); - - var logs = _loggerProvier.GetAllLogMessages().ToArray(); - Assert.Equal(result, expectedConcurrency); - } - - private IConcurrencyStatusRepository CreateConcurrencyStatusRepository(DateTime timestamp, string[] functionNames, int[] concurrecnies) - { - HostConcurrencySnapshot hostConcurrencySnapshot = new HostConcurrencySnapshot() - { - Timestamp = timestamp - }; - - var functionSnapshots = new Dictionary(); - for (int i = 0; i < functionNames.Length; i++) - { - functionSnapshots[functionNames[i]] = new FunctionConcurrencySnapshot() { Concurrency = concurrecnies[i] }; - } - hostConcurrencySnapshot.FunctionSnapshots = functionSnapshots; - - Mock mockoncurrencyStatusRepository = new Mock(); - mockoncurrencyStatusRepository.Setup(x => x.ReadAsync(It.IsAny())).ReturnsAsync(hostConcurrencySnapshot); - return mockoncurrencyStatusRepository.Object; - } - } -}