diff --git a/build/common.props b/build/common.props index d73d80333..bbf758237 100644 --- a/build/common.props +++ b/build/common.props @@ -1,7 +1,7 @@ - 3.0.36$(VersionSuffix) + 3.0.36-11991 5.0.0-beta.2$(VersionSuffix) 4.0.3$(VersionSuffix) diff --git a/src/Microsoft.Azure.WebJobs.Host.Storage/BlobStorageConcurrencyStatusRepository.cs b/src/Microsoft.Azure.WebJobs.Host.Storage/BlobStorageConcurrencyStatusRepository.cs index 2a7a6b902..914f6b598 100644 --- a/src/Microsoft.Azure.WebJobs.Host.Storage/BlobStorageConcurrencyStatusRepository.cs +++ b/src/Microsoft.Azure.WebJobs.Host.Storage/BlobStorageConcurrencyStatusRepository.cs @@ -17,7 +17,7 @@ namespace Microsoft.Azure.WebJobs.Host { - internal class BlobStorageConcurrencyStatusRepository : IConcurrencyStatusRepository + public class BlobStorageConcurrencyStatusRepository : IConcurrencyStatusRepository { private readonly IHostIdProvider _hostIdProvider; private readonly ILogger _logger; diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/ITriggerScaleConfigure.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/ITriggerScaleConfigure.cs new file mode 100644 index 000000000..06139871b --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/ITriggerScaleConfigure.cs @@ -0,0 +1,24 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Extensions.Hosting; + +namespace Microsoft.Azure.WebJobs.Hosting +{ + /// + /// Interface defining a trigger scale configuration action. + /// as part of host startup. + /// + public interface IConfigureTriggerScale + { + /// + /// Performs the scale configuration action for a trigger. The host will call this + /// method at the right time during scale host initialization for each trigger. + /// + /// The that can be used to + /// configure the trigger scale. + /// he hat can be used to + /// configure the trigger scale. + void ConfigureTriggerScale(IHostBuilder builder, HostBuilderContext triggerScaleContext); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsHostBuilderExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsHostBuilderExtensions.cs index e48884109..80bda5934 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsHostBuilderExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsHostBuilderExtensions.cs @@ -4,6 +4,7 @@ using System; using System.Linq; using Microsoft.Azure.WebJobs; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration.EnvironmentVariables; @@ -135,6 +136,22 @@ public static IHostBuilder ConfigureWebJobs(this IHostBuilder builder, Action + /// Applies scale configuration to the specified . + /// + /// The to configure. + /// The . + public static IHostBuilder AddScale(this IHostBuilder builder) + { + builder.ConfigureServices((services) => + { + services.TryAddSingleton(); + services.TryAddSingleton(); + }); + + return builder; + } + private static IConfigurationBuilder TryAddDefaultConfigurationSources(this IConfigurationBuilder config) { if (!config.Sources.OfType().Any(p => string.Equals(p.Path, "appsettings.json", StringComparison.OrdinalIgnoreCase))) diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleManager.cs new file mode 100644 index 000000000..e04af2997 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleManager.cs @@ -0,0 +1,27 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + public interface IScaleManager + { + /// + /// Gets aggrigated for all the trigger. + /// + /// The scale status context. + /// + Task GetScaleStatusAsync(ScaleStatusContext context); + + /// + /// Gets for each trigger. + /// + /// + /// + Task> GetScaleStatusesAsync(ScaleStatusContext context); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMetricsRepository.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMetricsRepository.cs new file mode 100644 index 000000000..2f836b1be --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMetricsRepository.cs @@ -0,0 +1,28 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Interface defining methods for reading/writing scale metrics to a persistent store. + /// + public interface IScaleMetricsRepository + { + /// + /// Persist the metrics for each monitor. + /// + /// The collection of metrics for each monitor. + /// A task. + Task WriteMetricsAsync(IDictionary monitorMetrics); + + /// + /// Read the metrics. + /// + /// The current collection of monitors. + /// Map of metrics per monitor. + Task>> ReadMetricsAsync(IEnumerable monitors); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs new file mode 100644 index 000000000..41bd4a5a2 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs @@ -0,0 +1,306 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Class to get scaling votes for all the triggers or for an individual trigger. + /// + internal class ScaleManager : IScaleManager + { + private readonly IScaleMonitorManager _monitorManager; + private readonly ITargetScalerManager _targetScalerManager; + private readonly IScaleMetricsRepository _metricsRepository; + private readonly IConcurrencyStatusRepository _concurrencyStatusRepository; + private readonly ILogger _logger; + private readonly HashSet _targetScalersInError; + private IOptions _scaleOptions; + + public ScaleManager( + IScaleMonitorManager monitorManager, + ITargetScalerManager targetScalerManager, + IScaleMetricsRepository metricsRepository, + IConcurrencyStatusRepository concurrencyStatusRepository, + IOptions scaleConfiguration, + ILoggerFactory loggerFactory) + { + _monitorManager = monitorManager; + _targetScalerManager = targetScalerManager; + _metricsRepository = metricsRepository; + _concurrencyStatusRepository = concurrencyStatusRepository; + _logger = loggerFactory?.CreateLogger(); + _targetScalersInError = new HashSet(); + _scaleOptions = scaleConfiguration; + } + + // for mock testing only + internal ScaleManager() + { + } + + + /// + /// Gets overall scale status for all the triggers. + /// + /// The scale status context + /// A task that returns for all ther triggers. + public async Task GetScaleStatusAsync(ScaleStatusContext context) + { + GetScalersToSample(out List scaleMonitorsToProcess, out List targetScalersToProcess); + + var scaleMonitorStatuses = await GetScaleMonitorsResultAsync(context, scaleMonitorsToProcess); + var targetScalerStatuses = await GetTargetScalersResultAsync(context, targetScalersToProcess); + + return new ScaleStatus + { + Vote = GetAggregateScaleVote(scaleMonitorStatuses.Values.Select(x => x.Vote).Union(targetScalerStatuses.Select(x => x.Value.Vote)), context, _logger), + TargetWorkerCount = targetScalerStatuses.Any() ? targetScalerStatuses.Max(x => x.Value.TargetWorkerCount) : null + }; + } + + /// + /// Gets scale status for individual triggers. + /// + /// The scale status context + /// A task that returns dictionary of which represents each trigger. + public async Task> GetScaleStatusesAsync(ScaleStatusContext context) + { + GetScalersToSample(out List scaleMonitorsToProcess, out List targetScalersToProcess); + + var scaleMonitorStatuses = await GetScaleMonitorsResultAsync(context, scaleMonitorsToProcess); + var targetScalerStatuses = await GetTargetScalersResultAsync(context, targetScalersToProcess); + + return scaleMonitorStatuses.Concat(targetScalerStatuses).ToDictionary(s => s.Key, s => s.Value); + } + + private async Task> GetScaleMonitorsResultAsync(ScaleStatusContext context, IEnumerable scaleMonitorsToProcess) + { + Dictionary votes = new Dictionary(); + if (scaleMonitorsToProcess != null && scaleMonitorsToProcess.Any()) + { + // get the collection of current metrics for each monitor + var monitorMetrics = await _metricsRepository.ReadMetricsAsync(scaleMonitorsToProcess); + + _logger.LogDebug($"Computing scale status (WorkerCount={context.WorkerCount})"); + _logger.LogDebug($"{monitorMetrics.Count} scale monitors to sample"); + + // for each monitor, ask it to return its scale status (vote) based on + // the metrics and context info (e.g. worker count) + foreach (var pair in monitorMetrics) + { + var monitor = pair.Key; + var metrics = pair.Value; + + try + { + // create a new context instance to avoid modifying the + // incoming context + var scaleStatusContext = new ScaleStatusContext + { + WorkerCount = context.WorkerCount, + Metrics = metrics + }; + var result = monitor.GetScaleStatus(scaleStatusContext); + + _logger.LogDebug($"Monitor '{monitor.Descriptor.Id}' voted '{result.Vote.ToString()}'"); + string key = monitor.Descriptor.FunctionId ?? monitor.Descriptor.Id; + votes.Add(key, new ScaleStatus() + { + Vote = result.Vote + }); + } + catch (Exception exc) when (!exc.IsFatal()) + { + // if a particular monitor fails, log and continue + _logger.LogError(exc, $"Failed to query scale status for monitor '{monitor.Descriptor.Id}'."); + } + } + } + else + { + // no monitors registered + // this can happen if the host is offline + } + + return votes; + } + + private async Task> GetTargetScalersResultAsync(ScaleStatusContext context, IEnumerable targetScalersToProcess) + { + Dictionary targetScaleVotes = new Dictionary(); + + if (targetScalersToProcess != null && targetScalersToProcess.Any()) + { + _logger.LogDebug($"{targetScalersToProcess.Count()} target scalers to sample"); + HostConcurrencySnapshot snapshot = null; + try + { + snapshot = await _concurrencyStatusRepository.ReadAsync(CancellationToken.None); + } + catch (Exception exc) when (!exc.IsFatal()) + { + _logger.LogError(exc, $"Failed to read concurrency status repository"); + } + + foreach (var targetScaler in targetScalersToProcess) + { + try + { + TargetScalerContext targetScaleStatusContext = new TargetScalerContext(); + if (snapshot != null) + { + if (snapshot.FunctionSnapshots.TryGetValue(targetScaler.TargetScalerDescriptor.FunctionId, out var functionSnapshot)) + { + targetScaleStatusContext.InstanceConcurrency = functionSnapshot.Concurrency; + _logger.LogDebug($"Snapshot dynamic concurrency for target scaler '{targetScaler.TargetScalerDescriptor.FunctionId}' is '{functionSnapshot.Concurrency}'"); + } + } + TargetScalerResult result = null; + try + { + result = await targetScaler.GetScaleResultAsync(targetScaleStatusContext); + } + catch (NotSupportedException ex) + { + string targetScalerUniqueId = GetTargetScalerFunctionUniqueId(targetScaler); + _logger.LogWarning($"Unable to use target based scaling for Function '{targetScaler.TargetScalerDescriptor.FunctionId}'. Metrics monitoring will be used.", ex); + _targetScalersInError.Add(targetScalerUniqueId); + + // Adding ScaleVote.None vote + result = new TargetScalerResult + { + TargetWorkerCount = context.WorkerCount + }; + } + _logger.LogDebug($"Target worker count for '{targetScaler.TargetScalerDescriptor.FunctionId}' is '{result.TargetWorkerCount}'"); + + ScaleVote vote = ScaleVote.None; + if (context.WorkerCount > result.TargetWorkerCount) + { + vote = ScaleVote.ScaleIn; + } + else if (context.WorkerCount < result.TargetWorkerCount) + { + vote = ScaleVote.ScaleOut; + } + + targetScaleVotes.Add(targetScaler.TargetScalerDescriptor.FunctionId, new ScaleStatus + { + TargetWorkerCount = result.TargetWorkerCount, + Vote = vote + }); + } + catch (Exception exc) when (!exc.IsFatal()) + { + // if a particular target scaler fails, log and continue + _logger.LogError(exc, $"Failed to query scale result for target scaler '{targetScaler.TargetScalerDescriptor.FunctionId}'."); + } + } + } + return targetScaleVotes; + } + + /// + /// Returns scale monitors and target scalers we want to use based on the configuration. + /// Scaler monitor will be ignored if a target scaler is defined in the same extensions assembly and TBS is enabled. + /// + /// Scale monitor to process. + /// Target scaler to process. + internal virtual void GetScalersToSample( + out List scaleMonitorsToSample, + out List targetScalersToSample) + { + var scaleMonitors = _monitorManager.GetMonitors(); + var targetScalers = _targetScalerManager.GetTargetScalers(); + + scaleMonitorsToSample = new List(); + targetScalersToSample = new List(); + + // Check if TBS enabled on app level + if (_scaleOptions.Value.IsTargetBasedScalingEnabled) + { + HashSet targetScalerFunctions = new HashSet(); + foreach (var scaler in targetScalers) + { + string scalerUniqueId = GetTargetScalerFunctionUniqueId(scaler); + if (!_targetScalersInError.Contains(scalerUniqueId)) + { + if (_scaleOptions.Value.IsTargetBasedScalingEnabledForTriggerFunc(scaler)) + { + targetScalersToSample.Add(scaler); + targetScalerFunctions.Add(scalerUniqueId); + } + } + } + + foreach (var monitor in scaleMonitors) + { + string monitorUniqueId = GetScaleMonitorFunctionUniqueId(monitor); + // Check if target based scaler exists for the function + if (!targetScalerFunctions.Contains(monitorUniqueId)) + { + scaleMonitorsToSample.Add(monitor); + } + } + } + else + { + scaleMonitorsToSample.AddRange(scaleMonitors); + } + } + + internal static ScaleVote GetAggregateScaleVote(IEnumerable votes, ScaleStatusContext context, ILogger logger) + { + ScaleVote vote = ScaleVote.None; + if (votes.Any()) + { + // aggregate all the votes into a single vote + if (votes.Any(p => p == ScaleVote.ScaleOut)) + { + // scale out if at least 1 monitor requires it + logger?.LogDebug("Scaling out based on votes"); + vote = ScaleVote.ScaleOut; + } + else if (context.WorkerCount > 0 && votes.All(p => p == ScaleVote.ScaleIn)) + { + // scale in only if all monitors vote scale in + logger?.LogDebug("Scaling in based on votes"); + vote = ScaleVote.ScaleIn; + } + } + else if (context.WorkerCount > 0) + { + // if no functions exist or are enabled we'll scale in + logger?.LogDebug("No enabled functions or scale votes so scaling in"); + vote = ScaleVote.ScaleIn; + } + + return vote; + } + + private string GetTargetScalerFunctionUniqueId(ITargetScaler scaler) + { + return $"{GetAssemblyName(scaler.GetType())}-{scaler.TargetScalerDescriptor.FunctionId}"; + } + + + private string GetScaleMonitorFunctionUniqueId(IScaleMonitor monitor) + { + return $"{GetAssemblyName(monitor.GetType())}-{monitor.Descriptor.FunctionId}"; + } + + + protected string GetAssemblyName(Type type) + { + return type.Assembly.GetName().Name; + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs new file mode 100644 index 000000000..b23b963d7 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs @@ -0,0 +1,141 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using System.Threading; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System.Linq; +using Microsoft.Extensions.Options; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Class to store scale metrics. + /// + internal class ScaleMonitorService : IHostedService, IDisposable + { + private readonly Timer _timer; + private IScaleManager _scaleManager; + private IScaleMetricsRepository _metricsRepository; + private bool _disposed; + protected readonly ILogger _logger; + private IOptions _scaleOptions; + + public ScaleMonitorService(IScaleManager scaleManager, IScaleMetricsRepository metricsRepository, IOptions scaleOptions, ILoggerFactory loggerFactory) + { + _scaleManager = scaleManager; + _metricsRepository = metricsRepository; + _timer = new Timer(OnTimer, null, Timeout.Infinite, Timeout.Infinite); + _logger = loggerFactory.CreateLogger(); + _scaleOptions = scaleOptions; + } + + public virtual Task StartAsync(CancellationToken cancellationToken) + { + // start the timer by setting the due time + _logger.LogInformation("Scale monitor service started is started."); + SetTimerInterval((int)_scaleOptions.Value.ScaleMetricsSampleInterval.TotalMilliseconds); + + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + // stop the timer if it has been started + _timer.Change(Timeout.Infinite, Timeout.Infinite); + + return Task.CompletedTask; + } + + protected async virtual void OnTimer(object state) + { + await TakeMetricsSamplesAsync(); + SetTimerInterval((int)_scaleOptions.Value.ScaleMetricsSampleInterval.TotalMilliseconds); + } + + public void Dispose() + { + Dispose(true); + } + + private async Task TakeMetricsSamplesAsync() + { + try + { + (_scaleManager as ScaleManager).GetScalersToSample(out List scaleMonitorsToProcess, out List targetScalersToProcess); + + if (scaleMonitorsToProcess.Any()) + { + _logger.LogDebug($"Taking metrics samples for {scaleMonitorsToProcess.Count()} monitor(s)."); + + var metricsMap = new Dictionary(); + foreach (var monitor in scaleMonitorsToProcess) + { + ScaleMetrics metrics = null; + try + { + // take a metrics sample for each monitor + metrics = await monitor.GetMetricsAsync(); + metricsMap[monitor] = metrics; + + // log the metrics json to provide visibility into monitor activity + var json = JsonConvert.SerializeObject(metrics); + _logger.LogDebug($"Scale metrics sample for monitor '{monitor.Descriptor.Id}': {json}"); + } + catch (Exception exc) when (!exc.IsFatal()) + { + // if a particular monitor fails, log and continue + _logger.LogError(exc, $"Failed to collect scale metrics sample for monitor '{monitor.Descriptor.Id}'."); + } + } + + if (metricsMap.Count > 0) + { + // persist the metrics samples + await _metricsRepository.WriteMetricsAsync(metricsMap); + } + } + } + catch (Exception exc) when (!exc.IsFatal()) + { + _logger.LogError(exc, "Failed to collect/persist scale metrics."); + } + } + + private void SetTimerInterval(int dueTime) + { + if (!_disposed) + { + var timer = _timer; + if (timer != null) + { + try + { + _timer.Change(dueTime, Timeout.Infinite); + } + catch (ObjectDisposedException) + { + // might race with dispose + } + } + } + } + + private void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _timer.Dispose(); + } + + _disposed = true; + } + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleOptions.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleOptions.cs new file mode 100644 index 000000000..95e31be1d --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleOptions.cs @@ -0,0 +1,97 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Configuration for scaling. + /// + public class ScaleOptions + { + private TimeSpan _scaleMetricsMaxAge; + private TimeSpan _scaleMetricsSampleInterval; + + public ScaleOptions() + { + // At the default values, a single monitor will be generating 6 samples per minute + // so at 2 minutes that's 12 samples + // Assume a case of 100 functions in an app, each mapping to a monitor. Thats + // 1200 samples to read from storage on each scale status request. + ScaleMetricsMaxAge = TimeSpan.FromMinutes(2); + ScaleMetricsSampleInterval = TimeSpan.FromSeconds(10); + MetricsPurgeEnabled = true; + } + + /// + /// Gets or sets a value indicating the maximum age for metrics. + /// Metrics that exceed this age will not be returned to monitors. + /// + public TimeSpan ScaleMetricsMaxAge + { + get + { + return _scaleMetricsMaxAge; + } + + set + { + if (value < TimeSpan.FromMinutes(1) || value > TimeSpan.FromMinutes(5)) + { + throw new ArgumentOutOfRangeException(nameof(ScaleMetricsMaxAge)); + } + _scaleMetricsMaxAge = value; + } + } + + /// + /// Gets or sets the sampling interval for metrics. + /// + public TimeSpan ScaleMetricsSampleInterval + { + get + { + return _scaleMetricsSampleInterval; + } + + set + { + if (value < TimeSpan.FromSeconds(1) || value > TimeSpan.FromSeconds(30)) + { + throw new ArgumentOutOfRangeException(nameof(ScaleMetricsSampleInterval)); + } + _scaleMetricsSampleInterval = value; + } + } + + /// + /// Gets or sets a value indicating whether old metrics data + /// will be auto purged. + /// + public bool MetricsPurgeEnabled { get; set; } + + public string Format() + { + var options = new JObject + { + { nameof(ScaleMetricsMaxAge), ScaleMetricsMaxAge }, + { nameof(ScaleMetricsSampleInterval), ScaleMetricsSampleInterval } + }; + + return options.ToString(Formatting.Indented); + } + + /// + /// Gets or sets if target base scale is enabled. + /// + public bool IsTargetBasedScalingEnabled { get; set; } + + /// + /// Gets or sets the function to checks if target scaler is supported for specific + /// + public Func IsTargetBasedScalingEnabledForTriggerFunc { get; set; } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleStatus.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleStatus.cs index f66bb3e65..e064af347 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleStatus.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleStatus.cs @@ -12,5 +12,10 @@ public class ScaleStatus /// Gets or sets the current scale decision. /// public ScaleVote Vote { get; set; } + + /// + /// Gets or sets the current target worker count. + /// + public int? TargetWorkerCount { get; set; } } } diff --git a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/Scale/ScaleHostEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/Scale/ScaleHostEndToEndTests.cs new file mode 100644 index 000000000..ea2bb90e2 --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/Scale/ScaleHostEndToEndTests.cs @@ -0,0 +1,352 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.Storage; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Threading; +using Xunit; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json.Linq; +using Microsoft.Extensions.Configuration; +using System.IO; +using Microsoft.Azure.WebJobs.Hosting; +using Microsoft.Azure.WebJobs.Host.Triggers; +using System.Reflection; + +namespace Microsoft.Azure.WebJobs.Host.EndToEndTests.Scale +{ + public class ScaleHostEndToEndTests + { + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task ScaleManager_GetScaleStatusAsync_ReturnsExpected(bool tbsEnabled) + { + string triggerData = +@"[ +{ + ""type"": ""serviceBusTrigger"", + ""functionName"": ""Function1"" +}, +{ + ""type"": ""serviceBusTrigger"", + ""functionName"": ""Function2"" +}, +{ + ""type"": ""cosmosDbTrigger"", + ""functionName"": ""Function3"" +}, +]"; + + string hostJson = +@"{ + ""extensions"": { + ""serviceBus"" : { + ""maxConcurrentCalls"": 16 + }, + ""cosmosDb"" : { + ""maxConcurrentCalls"": 8 + } + } +}"; + + var loggerFactory = new LoggerFactory(); + var loggerProvider = new TestLoggerProvider(); + loggerFactory.AddProvider(loggerProvider); + + IHostBuilder hostBuilder = new HostBuilder(); + hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) => + { + // Adding host.json here + config.AddJsonStream(new MemoryStream(Encoding.ASCII.GetBytes(hostJson))); + + //Adding app setings + config.AddInMemoryCollection(new List>() + { + new KeyValuePair("app_setting1", "value1"), + new KeyValuePair("app_setting2", "value2") + }); + }) + .ConfigureServices(services => + { + // Add all the services need to initinate IScaleManager and IScaleMonitorService + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(new HostIdProviderImpl()); + services.AddSingleton(); + services.AddSingleton(new ScaleMetricsRepositoryImpl()); + services.AddSingleton(loggerFactory); + + services.AddOptions().Configure(options => + { + options.IsTargetBasedScalingEnabled = tbsEnabled; + options.IsTargetBasedScalingEnabledForTriggerFunc = (targetScaler) => + { + return true; + }; + }); + }) + .AddScale(); // Adding IScaleManager and IScaleMonitorService + + // Emulate "dynamic assemly loading" of extensions. In the end we need IConfigureTriggerScale instances. + Assembly assembly = Assembly.GetExecutingAssembly(); + Type extensionType1 = assembly.GetType("Microsoft.Azure.WebJobs.Host.EndToEndTests.Scale.ServiceBusExtension"); + IConfigureTriggerScale serviceBusHostScaleConfigurator = (IConfigureTriggerScale)Activator.CreateInstance(extensionType1); + Type extensionType2 = assembly.GetType("Microsoft.Azure.WebJobs.Host.EndToEndTests.Scale.CosmosDbExtension"); + IConfigureTriggerScale cosmosDbScaleConfigurator = (IConfigureTriggerScale)Activator.CreateInstance(extensionType2); + + // Iterate through all the triggers and add scalers for each trigger + var jarray = JArray.Parse(triggerData); + foreach (var jtoken in jarray) + { + var properties = new Dictionary() { { "triggerData", jtoken.ToString() } }; + HostBuilderContext hostBuilderContext = new HostBuilderContext(properties); + + if (string.Equals(jtoken["type"].ToString(), "serviceBusTrigger", StringComparison.InvariantCultureIgnoreCase)) + { + serviceBusHostScaleConfigurator.ConfigureTriggerScale(hostBuilder, hostBuilderContext); + } + + if (string.Equals(jtoken["type"].ToString(), "cosmosDbTrigger", StringComparison.InvariantCultureIgnoreCase)) + { + cosmosDbScaleConfigurator.ConfigureTriggerScale(hostBuilder, hostBuilderContext); + } + } + + IHost scaleHost = hostBuilder.Build(); + await scaleHost.StartAsync(); + + IHostedService scaleMonitorService = scaleHost.Services.GetService(); + Assert.NotNull(scaleMonitorService); + + await TestHelpers.Await(() => + { + IScaleManager scaleMonitor = scaleHost.Services.GetService(); + var scaleStatus = scaleMonitor.GetScaleStatusAsync(new ScaleStatusContext()).GetAwaiter().GetResult(); + var scaleStatuses = scaleMonitor.GetScaleStatusesAsync(new ScaleStatusContext()).GetAwaiter().GetResult(); + if (!tbsEnabled) + { + return scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == null + && scaleStatuses["Function1"].Vote == ScaleVote.ScaleOut && scaleStatuses["Function1"].TargetWorkerCount == null + && scaleStatuses["Function2"].Vote == ScaleVote.ScaleOut && scaleStatuses["Function2"].TargetWorkerCount == null + && scaleStatuses["Function3"].Vote == ScaleVote.ScaleOut && scaleStatuses["Function3"].TargetWorkerCount == null; + } + else + { + return scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == 4 + && scaleStatuses["Function1"].Vote == ScaleVote.ScaleOut && scaleStatuses["Function1"].TargetWorkerCount == 2 + && scaleStatuses["Function2"].Vote == ScaleVote.ScaleOut && scaleStatuses["Function2"].TargetWorkerCount == 3 + && scaleStatuses["Function3"].Vote == ScaleVote.ScaleOut && scaleStatuses["Function3"].TargetWorkerCount == 4; + } + }); + } + + private class ScaleMonitorManagerImpl : IScaleMonitorManager + { + private readonly List _scalers; + + public ScaleMonitorManagerImpl(IEnumerable providers) + { + _scalers = providers.Select(x => x.GetMonitor()).ToList(); + } + + public void Register(IScaleMonitor monitor) + { + throw new NotImplementedException(); + } + + public IEnumerable GetMonitors() + { + return _scalers; + } + } + + private class TargetScalerManagerImpl : ITargetScalerManager + { + private readonly List _scalers; + + public TargetScalerManagerImpl(IEnumerable providers) + { + _scalers = providers.Select(x => x.GetTargetScaler()).ToList(); + } + + public void Register(ITargetScaler monitor) + { + throw new NotImplementedException(); + } + + public IEnumerable GetTargetScalers() + { + return _scalers; + } + } + + private class HostIdProviderImpl : IHostIdProvider + { + public Task GetHostIdAsync(CancellationToken cancellationToken) + { + return Task.FromResult("test-host"); + } + } + + private class ConcurrencyStatusRepositoryImpl : IConcurrencyStatusRepository + { + public Task WriteAsync(HostConcurrencySnapshot snapshot, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task ReadAsync(CancellationToken cancellationToken) + { + return Task.FromResult(new HostConcurrencySnapshot() + { + FunctionSnapshots = new Dictionary() + { + { "func1", new FunctionConcurrencySnapshot() { Concurrency = 1 } }, + { "func2", new FunctionConcurrencySnapshot() { Concurrency = 1 } } + } + }); + } + } + + private class ScaleMetricsRepositoryImpl : IScaleMetricsRepository + { + private IDictionary _cache; + + public Task WriteMetricsAsync(IDictionary monitorMetrics) + { + _cache = monitorMetrics; + return Task.CompletedTask; + } + + public Task>> ReadMetricsAsync(IEnumerable monitors) + { + + IDictionary> result = new Dictionary>(); + if (_cache != null) + { + foreach (var pair in _cache) + { + result[pair.Key] = new List(); + } + } + + return Task.FromResult(result); + } + } + + private class ScaleMonitorImpl : IScaleMonitor + { + public ScaleMonitorDescriptor Descriptor { get; set; } + + public ScaleMonitorImpl(string id, string functionId) + { + Descriptor = new ScaleMonitorDescriptor(id, functionId); + } + + public Task GetMetricsAsync() + { + return Task.FromResult(new ScaleMetrics()); + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + int targetWorkerCount = (int)Char.GetNumericValue(Descriptor.FunctionId[Descriptor.FunctionId.Length - 1]); + + return new ScaleStatus() + { + TargetWorkerCount = targetWorkerCount, + Vote = ScaleVote.ScaleOut + }; + } + } + + private class TargetScalerImpl : ITargetScaler + { + public TargetScalerDescriptor TargetScalerDescriptor { get; set; } + + public TargetScalerImpl(string functionId) + { + TargetScalerDescriptor = new TargetScalerDescriptor(functionId); + } + + public Task GetScaleResultAsync(TargetScalerContext context) + { + int targetWorkerCount = (int)Char.GetNumericValue(TargetScalerDescriptor.FunctionId[TargetScalerDescriptor.FunctionId.Length - 1]) + 1; + + return Task.FromResult(new TargetScalerResult() + { + TargetWorkerCount = targetWorkerCount + }); + } + } + + public class ScalerProvider : IScaleMonitorProvider, ITargetScalerProvider + { + private readonly string _functionName; + + public ScalerProvider(string functionName) + { + _functionName = functionName; + } + + public IScaleMonitor GetMonitor() + { + return new ScaleMonitorImpl(_functionName, _functionName); + } + + public ITargetScaler GetTargetScaler() + { + return new TargetScalerImpl(_functionName); + } + } + } + + internal class ServiceBusExtension : IConfigureTriggerScale + { + public void ConfigureTriggerScale(IHostBuilder builder, HostBuilderContext triggerScaleContext) + { + builder.ConfigureServices((context, services) => + { + // Getting an property + JObject triggerData = JObject.Parse(triggerScaleContext.Properties["triggerData"].ToString()); + string fucntionName = triggerData["functionName"].ToString(); + + var appSetting = context.Configuration.GetValue("app_setting1"); + Assert.NotNull(appSetting); + var hostJsonSetting = context.Configuration.GetValue("extensions:serviceBus:maxConcurrentCalls"); + Assert.NotNull(hostJsonSetting); + + var provider = new ScaleHostEndToEndTests.ScalerProvider(fucntionName); + services.AddSingleton(provider); + services.AddSingleton(provider); + }); + } + } + + internal class CosmosDbExtension : IConfigureTriggerScale + { + public void ConfigureTriggerScale(IHostBuilder builder, HostBuilderContext triggerScaleContext) + { + builder.ConfigureServices((context, services) => + { + // Getting an property + JObject triggerData = JObject.Parse(triggerScaleContext.Properties["triggerData"].ToString()); + string fucntionName = triggerData["functionName"].ToString(); + + var provider = new ScaleHostEndToEndTests.ScalerProvider(fucntionName); + services.AddSingleton(provider); + services.AddSingleton(provider); + }); + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/PublicSurfaceTests.cs b/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/PublicSurfaceTests.cs index c16546396..3a9df400c 100644 --- a/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/PublicSurfaceTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/PublicSurfaceTests.cs @@ -24,7 +24,8 @@ public void WebJobs_Host_Storage_VerifyPublicSurfaceArea() "JobHostInternalStorageOptions", "RuntimeStorageWebJobsBuilderExtensions", "StorageServiceCollectionExtensions", - "IAzureBlobStorageProvider" + "IAzureBlobStorageProvider", + "BlobStorageConcurrencyStatusRepository" }; TestHelpers.AssertPublicTypes(expected, assembly); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs index 438e31804..eff2dbde5 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs @@ -304,7 +304,11 @@ public void WebJobs_Host_VerifyPublicSurfaceArea() "ITargetScalerProvider", "TargetScalerDescriptor", "TargetScalerResult", - "TargetScalerContext" + "TargetScalerContext", + "IScaleMetricsRepository", + "IScaleManager", + "ScaleOptions", + "IConfigureTriggerScale" }; TestHelpers.AssertPublicTypes(expected, assembly); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs new file mode 100644 index 000000000..f08757309 --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs @@ -0,0 +1,393 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + public class ScaleManagerTests + { + private readonly Mock _monitorManagerMock; + private readonly Mock _metricsRepositoryMock; + private readonly Mock _targetScalerManagerMock; + private readonly Mock _concurrencyStatusRepositoryMock; + private readonly ILoggerFactory _loggerFactory; + private readonly TestLoggerProvider _loggerProvider; + private readonly List _monitors; + private readonly List _targetScalers; + private readonly ILogger _testLogger; + private readonly IOptions _scaleOptions; + + public ScaleManagerTests() + { + _monitors = new List(); + _targetScalers = new List(); + _loggerProvider = new TestLoggerProvider(); + _loggerFactory = new LoggerFactory(); + _loggerFactory.AddProvider(_loggerProvider); + _testLogger = _loggerFactory.CreateLogger("Test"); + + _monitorManagerMock = new Mock(MockBehavior.Strict); + _monitorManagerMock.Setup(p => p.GetMonitors()).Returns(() => _monitors); + _metricsRepositoryMock = new Mock(MockBehavior.Strict); + _metricsRepositoryMock.Setup(x => x.ReadMetricsAsync(It.IsAny>())).ReturnsAsync(new Dictionary>()); + _targetScalerManagerMock = new Mock(MockBehavior.Strict); + _targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(() => _targetScalers); + _concurrencyStatusRepositoryMock = new Mock(MockBehavior.Strict); + _concurrencyStatusRepositoryMock.Setup(p => p.ReadAsync(It.IsAny())).ReturnsAsync( + new HostConcurrencySnapshot() + { + FunctionSnapshots = new Dictionary() + { + { "func1", new FunctionConcurrencySnapshot() { Concurrency = 1 } } + } + }); + _scaleOptions = Options.Create(new ScaleOptions() + { + IsTargetBasedScalingEnabled= true, + ScaleMetricsSampleInterval = TimeSpan.FromSeconds(10) + }); + } + + [Theory] + [InlineData(0, ScaleVote.None)] + [InlineData(1, ScaleVote.ScaleIn)] + public async Task GetScaleStatus_NoMonitors_ReturnsExpectedStatus(int workerCount, ScaleVote expected) + { + var context = new ScaleStatusContext + { + WorkerCount = workerCount + }; + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _scaleOptions, _loggerFactory); + var status = await scaleManager.GetScaleStatusAsync(context); + + Assert.Equal(expected, status.Vote); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task GetScaleStatus_ReturnsExpectedResult(bool tbsEnabled) + { + var context = new ScaleStatusContext + { + WorkerCount = 3 + }; + + var monitor1 = new TestScaleMonitor("func1-test-test", "func1"); + monitor1.Status = new ScaleStatus + { + Vote = ScaleVote.ScaleIn + }; + var monitor2 = new TestScaleMonitor1(); + monitor2.Status = new ScaleStatus + { + Vote = ScaleVote.ScaleOut + }; + var monitor3 = new TestScaleMonitor2(); + monitor3.Status = new ScaleStatus + { + Vote = ScaleVote.ScaleIn + }; + + List monitors = new List + { + monitor1 + }; + if (!tbsEnabled) + { + monitors.Add(monitor2); + monitors.Add(monitor3); + } + _monitorManagerMock.Setup(p => p.GetMonitors()).Returns(monitors); + + var monitorMetrics = new Dictionary> + { + { monitor1, new List() }, + { monitor2, new List() }, + { monitor3, new List() } + }; + _metricsRepositoryMock.Setup(p => p.ReadMetricsAsync(It.IsAny>())).ReturnsAsync(monitorMetrics); + + var targetScaler1 = new TestTargetScaler() + { + Result = new TargetScalerResult() + { + TargetWorkerCount = 2 + }, + TargetScalerDescriptor = new TargetScalerDescriptor("func1") + }; + + _targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(new List { targetScaler1 }); + + IOptions options = Options.Create(new CustomScaleOptions() + { + IsTargetBasedScalingEnabled = tbsEnabled, + TargetBasedScalingEnabledForTrigger = true + }); + + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory); + + var status = await scaleManager.GetScaleStatusAsync(context); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + if (!tbsEnabled) + { + Assert.Equal("Computing scale status (WorkerCount=3)", logs[0].FormattedMessage); + Assert.Equal("3 scale monitors to sample", logs[1].FormattedMessage); + Assert.Equal("Monitor 'func1-test-test' voted 'ScaleIn'", logs[2].FormattedMessage); + Assert.Equal("Monitor 'testscalemonitor1' voted 'ScaleOut'", logs[3].FormattedMessage); + Assert.Equal("Monitor 'testscalemonitor2' voted 'ScaleIn'", logs[4].FormattedMessage); + Assert.Equal(ScaleVote.ScaleOut, status.Vote); + Assert.Equal(null, status.TargetWorkerCount); + } + else + { + Assert.Equal("1 target scalers to sample", logs[0].FormattedMessage); + Assert.Equal("Snapshot dynamic concurrency for target scaler 'func1' is '1'", logs[1].FormattedMessage); + Assert.Equal("Target worker count for 'func1' is '2'", logs[2].FormattedMessage); + Assert.Equal(ScaleVote.ScaleIn, status.Vote); + Assert.Equal(2, status.TargetWorkerCount); + } + } + + [Fact] + public async Task GetScaleStatus_MonitorFails_ReturnsExpectedResult() + { + var context = new ScaleStatusContext + { + WorkerCount = 3 + }; + + var mockMonitor1 = new Mock(MockBehavior.Strict); + mockMonitor1.Setup(p => p.GetScaleStatus(It.Is(q => q.WorkerCount == context.WorkerCount))).Returns(new ScaleStatus { Vote = ScaleVote.ScaleIn }); + mockMonitor1.SetupGet(p => p.Descriptor).Returns(new ScaleMonitorDescriptor("testscalemonitor1")); + var mockMonitor2 = new Mock(MockBehavior.Strict); + mockMonitor2.SetupGet(p => p.Descriptor).Returns(new ScaleMonitorDescriptor("testscalemonitor2")); + var exception = new Exception("Kaboom!"); + mockMonitor2.Setup(p => p.GetScaleStatus(It.Is(q => q.WorkerCount == context.WorkerCount))).Throws(exception); + var mockMonitor3 = new Mock(MockBehavior.Strict); + mockMonitor3.Setup(p => p.GetScaleStatus(It.Is(q => q.WorkerCount == context.WorkerCount))).Returns(new ScaleStatus { Vote = ScaleVote.ScaleIn }); + mockMonitor3.SetupGet(p => p.Descriptor).Returns(new ScaleMonitorDescriptor("testscalemonitor3")); + List monitors = new List + { + mockMonitor1.Object, + mockMonitor2.Object, + mockMonitor3.Object + }; + + _monitorManagerMock.Setup(p => p.GetMonitors()).Returns(monitors); + + var monitorMetrics = new Dictionary> + { + { mockMonitor1.Object, new List() }, + { mockMonitor2.Object, new List() }, + { mockMonitor3.Object, new List() } + }; + _metricsRepositoryMock.Setup(p => p.ReadMetricsAsync(It.IsAny>())).ReturnsAsync(monitorMetrics); + + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _scaleOptions, _loggerFactory); + var status = await scaleManager.GetScaleStatusAsync(context); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + Assert.Equal("Computing scale status (WorkerCount=3)", logs[0].FormattedMessage); + Assert.Equal("3 scale monitors to sample", logs[1].FormattedMessage); + Assert.Equal("Monitor 'testscalemonitor1' voted 'ScaleIn'", logs[2].FormattedMessage); + Assert.Equal("Failed to query scale status for monitor 'testscalemonitor2'.", logs[3].FormattedMessage); + Assert.Same(exception, logs[3].Exception); + Assert.Equal("Monitor 'testscalemonitor3' voted 'ScaleIn'", logs[4].FormattedMessage); + + Assert.Equal(null, status.TargetWorkerCount); + Assert.Equal(ScaleVote.ScaleIn, status.Vote); + } + + [Fact] + public async Task GetScaleStatus_TargetScalerFails_ReturnsExpectedResult() + { + var context = new ScaleStatusContext + { + WorkerCount = 3 + }; + + var targetScaler1 = new TestTargetScaler { Result = new TargetScalerResult { TargetWorkerCount = 3 }, TargetScalerDescriptor = new TargetScalerDescriptor("func1") }; + var targetScaler2 = new TestTargetScaler2 { Result = new TargetScalerResult { TargetWorkerCount = 1 }, TargetScalerDescriptor = new TargetScalerDescriptor("func2") }; + var targetScaler3 = new TestTargetScaler { Result = new TargetScalerResult { TargetWorkerCount = -3 }, TargetScalerDescriptor = new TargetScalerDescriptor("func3") }; + List targetScalers = new List + { + targetScaler1, + targetScaler2, + targetScaler3 + }; + _targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(targetScalers); + + IOptions options = Options.Create(new CustomScaleOptions() + { + IsTargetBasedScalingEnabled = true, + TargetBasedScalingEnabledForTrigger = true, + }); + + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory); + + var status = await scaleManager.GetScaleStatusAsync(context); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + Assert.Equal("3 target scalers to sample", logs[0].FormattedMessage); + Assert.Equal($"Snapshot dynamic concurrency for target scaler 'func1' is '1'", logs[1].FormattedMessage); + Assert.Equal("Target worker count for 'func1' is '3'", logs[2].FormattedMessage); + Assert.Equal("Failed to query scale result for target scaler 'func2'.", logs[3].FormattedMessage); + Assert.Same("test", logs[3].Exception.Message); + Assert.Equal("Target worker count for 'func3' is '-3'", logs[4].FormattedMessage); + + Assert.Equal(3, status.TargetWorkerCount); + Assert.Equal(ScaleVote.None, status.Vote); + } + + [Theory] + [InlineData(0, 0, 0, ScaleVote.None)] + [InlineData(1, 0, 0, ScaleVote.ScaleIn)] + [InlineData(1, 1, 3, ScaleVote.ScaleOut)] + [InlineData(0, 0, 1, ScaleVote.None)] + [InlineData(1, 0, 1, ScaleVote.ScaleIn)] + [InlineData(5, 0, 3, ScaleVote.ScaleIn)] + public void GetAggregateScaleVote_ReturnsExpectedResult(int workerCount, int numScaleOutVotes, int numScaleInVotes, ScaleVote expected) + { + var context = new ScaleStatusContext + { + WorkerCount = workerCount + }; + List votes = new List(); + for (int i = 0; i < numScaleOutVotes; i++) + { + votes.Add(ScaleVote.ScaleOut); + } + for (int i = 0; i < numScaleInVotes; i++) + { + votes.Add(ScaleVote.ScaleIn); + } + var vote = ScaleManager.GetAggregateScaleVote(votes, context, _testLogger); + Assert.Equal(expected, vote); + } + + [Theory] + [InlineData(false, true, 1, 0)] + [InlineData(true, false, 1, 0)] + [InlineData(true, true, 0, 1)] + public void GetScalersToSample_Returns_Expected(bool targetBaseScalingEnabled, bool triggerEabled, int expectedScaleMonitorCount, int expectedTargetScalerCount) + { + List scaleMonitors = new List + { + new TestScaleMonitor("func1-test-test", "func1"), + }; + Mock scaleMonitorManagerMock = new Mock(MockBehavior.Strict); + scaleMonitorManagerMock.Setup(x => x.GetMonitors()).Returns(scaleMonitors); + + List targetScalers = new List + { + new TestTargetScaler() + { + TargetScalerDescriptor = new TargetScalerDescriptor("func1") + } + }; + Mock targetScalerManagerMock = new Mock(MockBehavior.Strict); + targetScalerManagerMock.Setup(x => x.GetTargetScalers()).Returns(targetScalers); + + IOptions options = Options.Create(new CustomScaleOptions() + { + IsTargetBasedScalingEnabled = targetBaseScalingEnabled, + TargetBasedScalingEnabledForTrigger = triggerEabled, + }); + + ScaleManager manager = new ScaleManager(scaleMonitorManagerMock.Object, targetScalerManagerMock.Object, _metricsRepositoryMock.Object, + _concurrencyStatusRepositoryMock.Object, options, _loggerFactory); + + manager.GetScalersToSample(out List scaleMonitorsToProcess, out List targetScalesToProcess); + + Assert.Equal(scaleMonitorsToProcess.Count(), expectedScaleMonitorCount); + Assert.Equal(targetScalesToProcess.Count(), expectedTargetScalerCount); + } + + [Fact] + public async Task GetScalersToSample_FallsBackToMonitor_OnTargetScalerError() + { + List scaleMonitors = new List + { + new TestScaleMonitor("function1-test-test", "function1") + }; + Mock scaleMonitorManagerMock = new Mock(MockBehavior.Strict); + scaleMonitorManagerMock.Setup(x => x.GetMonitors()).Returns(scaleMonitors); + + List targetScalers = new List + { + new FaultyTargetScaler() + { + TargetScalerDescriptor = new TargetScalerDescriptor("function1") + }, + new TestTargetScaler2() + { + TargetScalerDescriptor = new TargetScalerDescriptor("function2") + } + }; + Mock targetScalerManagerMock = new Mock(MockBehavior.Strict); + targetScalerManagerMock.Setup(x => x.GetTargetScalers()).Returns(targetScalers); + + var context = new ScaleStatusContext() + { + WorkerCount = 1 + }; + + IOptions options = Options.Create(new CustomScaleOptions() + { + IsTargetBasedScalingEnabled = true, + TargetBasedScalingEnabledForTrigger = true + }); + + ScaleManager scaleManager = new ScaleManager(scaleMonitorManagerMock.Object, targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory); + + scaleManager.GetScalersToSample(out List monitors1, out List scalers1); + Assert.Equal(monitors1.Count(), 0); + Assert.Equal(scalers1.Count(), 2); + ScaleStatus resutl1 = await scaleManager.GetScaleStatusAsync(context); + + Assert.Equal(resutl1.TargetWorkerCount, 1); + Assert.Equal(resutl1.Vote, ScaleVote.None); + var logs = _loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage).ToArray(); + Assert.Single(logs.Where(x => x == "Unable to use target based scaling for Function 'function1'. Metrics monitoring will be used.")); + _loggerProvider.ClearAllLogMessages(); + + scaleManager.GetScalersToSample(out List monitors2, out List scalers2); + Assert.Equal(monitors2.Count(), 1); + Assert.Equal(scalers2.Count(), 1); + ScaleStatus resutl2 = await scaleManager.GetScaleStatusAsync(context); + Assert.Equal(resutl2.TargetWorkerCount, null); + Assert.Equal(resutl2.Vote, ScaleVote.ScaleIn); + logs = _loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage).ToArray(); + Assert.Empty(logs.Where(x => x == "Unable to use target based scaling for Function 'function1'. Metrics monitoring will be used.")); + } + } + + public class CustomScaleOptions : ScaleOptions + { + public CustomScaleOptions() + { + IsTargetBasedScalingEnabledForTriggerFunc = IsTargetBasedScalingEnabledForTrigger; + } + + public bool TargetBasedScalingEnabledForTrigger { get; set; } + + private bool IsTargetBasedScalingEnabledForTrigger(ITargetScaler targetScaler) + { + return TargetBasedScalingEnabledForTrigger; + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleMonitorServiceTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleMonitorServiceTests.cs new file mode 100644 index 000000000..8315872a1 --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleMonitorServiceTests.cs @@ -0,0 +1,201 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Azure.WebJobs.Host.UnitTests.Scale; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + public class ScaleMonitorServiceTests + { + private readonly ScaleMonitorService _monitor; + private readonly TestMetricsRepository _metricsRepository; + private readonly TestLoggerProvider _loggerProvider; + private List _monitors; + private List _scalers; + + public ScaleMonitorServiceTests() + { + _monitors = new List(); + _scalers = new List(); + + _metricsRepository = new TestMetricsRepository(); + _loggerProvider = new TestLoggerProvider(); + ILoggerFactory loggerFactory = new LoggerFactory(); + loggerFactory.AddProvider(_loggerProvider); + + Mock functionsScaleManagerMock = new Mock(); + functionsScaleManagerMock.Setup(x => x.GetScalersToSample(out _monitors, out _scalers)); + + IOptions options = Options.Create(new ScaleOptions() + { + ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1), + }); + + _monitor = new ScaleMonitorService(functionsScaleManagerMock.Object, _metricsRepository, options, loggerFactory); + } + + [Fact] + public async Task OnTimer_ExceptionsAreHandled() + { + var monitor = new TestScaleMonitor1 + { + Exception = new Exception("Kaboom!") + }; + _monitors.Add(monitor); + + await _monitor.StartAsync(CancellationToken.None); + + // wait for a few failures to happen + LogMessage[] logs = null; + await TestHelpers.Await(() => + { + logs = _loggerProvider.GetAllLogMessages().Where(p => p.Level == LogLevel.Error).ToArray(); + return logs.Length >= 3; + }); + + Assert.All(logs, + p => + { + Assert.Same(monitor.Exception, p.Exception); + Assert.Equal("Failed to collect scale metrics sample for monitor 'testscalemonitor1'.", p.FormattedMessage); + }); + } + + [Fact] + public async Task OnTimer_PersistsMetrics() + { + var testMetrics = new List + { + new TestScaleMetrics1 { Count = 10 }, + new TestScaleMetrics1 { Count = 15 }, + new TestScaleMetrics1 { Count = 45 }, + new TestScaleMetrics1 { Count = 50 }, + new TestScaleMetrics1 { Count = 100 } + }; + var monitor1 = new TestScaleMonitor1 + { + Metrics = testMetrics + }; + _monitors.Add(monitor1); + + await _monitor.StartAsync(CancellationToken.None); + + await TestHelpers.Await(() => + { + return _metricsRepository.Count >= 5; + }); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + + var infoLogs = logs.Where(p => p.Level == LogLevel.Information); + Assert.Equal("Scale monitor service started is started.", logs[0].FormattedMessage); + Assert.Equal("Taking metrics samples for 1 monitor(s).", logs[1].FormattedMessage); + Assert.True(logs[2].FormattedMessage.StartsWith("Scale metrics sample for monitor 'testscalemonitor1': {\"Count\":10,")); + + var metricsWritten = _metricsRepository.Metrics[monitor1].Take(5); + Assert.Equal(testMetrics, metricsWritten); + } + + [Fact] + public async Task OnTimer_MonitorFailuresAreHandled() + { + var testMetrics1 = new List + { + new TestScaleMetrics1 { Count = 10 }, + new TestScaleMetrics1 { Count = 15 }, + new TestScaleMetrics1 { Count = 45 }, + new TestScaleMetrics1 { Count = 50 }, + new TestScaleMetrics1 { Count = 100 } + }; + var monitor1 = new TestScaleMonitor1 + { + Exception = new Exception("Kaboom!") + }; + _monitors.Add(monitor1); + + var testMetrics2 = new List + { + new TestScaleMetrics2 { Num = 300 }, + new TestScaleMetrics2 { Num = 350 }, + new TestScaleMetrics2 { Num = 400 }, + new TestScaleMetrics2 { Num = 450 }, + new TestScaleMetrics2 { Num = 500 } + }; + var monitor2 = new TestScaleMonitor2 + { + Metrics = testMetrics2 + }; + _monitors.Add(monitor2); + + await _monitor.StartAsync(CancellationToken.None); + + await TestHelpers.Await(() => + { + return _metricsRepository.Count >= 5; + }); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + + var infoLogs = logs.Where(p => p.Level == LogLevel.Information); + Assert.Equal("Scale monitor service started is started.", logs[0].FormattedMessage); + Assert.Equal("Taking metrics samples for 2 monitor(s).", logs[1].FormattedMessage); + + // verify the failure logs for the failing monitor + Assert.True(logs.Count(p => p.FormattedMessage.Equals($"Failed to collect scale metrics sample for monitor 'testscalemonitor1'.")) >= 5); + + // verify each successful sample is logged + Assert.True(logs.Count(p => p.FormattedMessage.StartsWith($"Scale metrics sample for monitor 'testscalemonitor2'")) >= 5); + + var metricsWritten = _metricsRepository.Metrics[monitor2].Take(5); + Assert.Equal(testMetrics2, metricsWritten); + } + } + + public class TestMetricsRepository : IScaleMetricsRepository + { + private int _count; + + public TestMetricsRepository() + { + _count = 0; + Metrics = new Dictionary>(); + } + + public int Count => _count; + + public IDictionary> Metrics { get; set; } + + public Task>> ReadMetricsAsync(IEnumerable monitors) + { + return Task.FromResult>>(Metrics); + } + + public Task WriteMetricsAsync(IDictionary monitorMetrics) + { + foreach (var pair in monitorMetrics) + { + if (!Metrics.ContainsKey(pair.Key)) + { + Metrics[pair.Key] = new List(); + } + + Metrics[pair.Key].Add(pair.Value); + + Interlocked.Increment(ref _count); + } + + return Task.CompletedTask; + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestScaleMonitor.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestScaleMonitor.cs new file mode 100644 index 000000000..6bd6f2e19 --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestScaleMonitor.cs @@ -0,0 +1,114 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + public class TestScaleMonitor : IScaleMonitor where TMetrics : ScaleMetrics + { + private readonly ScaleMonitorDescriptor _descriptor; + + public TestScaleMonitor() + { + Index = 0; + _descriptor = new ScaleMonitorDescriptor(GetType().Name.ToLower()); + } + + public TestScaleMonitor(string descriptorId, string functionId) + { + Index = 0; + _descriptor = new ScaleMonitorDescriptor(descriptorId, functionId); + } + + public ScaleMonitorDescriptor Descriptor => _descriptor; + + public Exception Exception { get; set; } + + public int Index { get; set; } + + public List Metrics { get; set; } + + public ScaleStatus Status { get; set; } + + Task IScaleMonitor.GetMetricsAsync() + { + if (Exception != null) + { + throw Exception; + } + + return Task.FromResult(Metrics[Index++ % Metrics.Count]); + } + + public async Task GetMetricsAsync() + { + return (TMetrics)(await GetMetricsAsync()); + } + + ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) + { + return Status; + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + return ((IScaleMonitor)this).GetScaleStatus(context); + } + } + + public class TestScaleMonitor1 : TestScaleMonitor + { + } + + public class TestScaleMetrics1 : ScaleMetrics + { + public long Count { get; set; } + } + + public class TestScaleMonitor2 : TestScaleMonitor + { + } + + public class TestScaleMetrics2 : ScaleMetrics + { + public int Num { get; set; } + } + + public class TestScaleMonitor3 : TestScaleMonitor + { + } + + public class TestScaleMetrics3 : ScaleMetrics + { + public TimeSpan TimeSpan { get; set; } + + public int Length { get; set; } + } + + public class TestInvalidScaleMonitor : IScaleMonitor + { + private readonly ScaleMonitorDescriptor _descriptor; + + public TestInvalidScaleMonitor() + { + _descriptor = new ScaleMonitorDescriptor("testmonitor-invalid"); + } + + public ScaleMonitorDescriptor Descriptor => _descriptor; + + public Task GetMetricsAsync() + { + throw new NotImplementedException(); + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + throw new NotImplementedException(); + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestTargetScaler.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestTargetScaler.cs new file mode 100644 index 000000000..ebf6a425c --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestTargetScaler.cs @@ -0,0 +1,37 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + internal class TestTargetScaler : ITargetScaler + { + public TargetScalerDescriptor TargetScalerDescriptor { get; set; } + + public TargetScalerResult Result { get; set; } + + public virtual Task GetScaleResultAsync(TargetScalerContext context) + { + return Task.FromResult(Result); + } + } + + internal class TestTargetScaler2 : TestTargetScaler + { + public override Task GetScaleResultAsync(TargetScalerContext context) + { + throw new Exception("test"); + } + } + + internal class FaultyTargetScaler : TestTargetScaler + { + public override Task GetScaleResultAsync(TargetScalerContext context) + { + throw new NotSupportedException("test"); + } + } +}