diff --git a/build/common.props b/build/common.props index d73d80333..045e3df70 100644 --- a/build/common.props +++ b/build/common.props @@ -1,8 +1,8 @@ - 3.0.36$(VersionSuffix) - 5.0.0-beta.2$(VersionSuffix) + 3.0.37$(VersionSuffix) + 5.0.0$(VersionSuffix) 4.0.3$(VersionSuffix) netstandard2.0 diff --git a/sample/SampleHost/SampleHost.csproj b/sample/SampleHost/SampleHost.csproj index 35c3eccca..c87663b08 100644 --- a/sample/SampleHost/SampleHost.csproj +++ b/sample/SampleHost/SampleHost.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/Microsoft.Azure.WebJobs.Host.Storage/StorageServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host.Storage/StorageServiceCollectionExtensions.cs index 4a2102f60..4e8737aa3 100644 --- a/src/Microsoft.Azure.WebJobs.Host.Storage/StorageServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host.Storage/StorageServiceCollectionExtensions.cs @@ -36,6 +36,13 @@ public static void AddAzureStorageCoreServices(this IServiceCollection services) services.AddSingleton(); } + public static void AddAzureStorageScaleServices(this IServiceCollection services) + { + services.TryAddEnumerable(ServiceDescriptor.Transient, CoreWebJobsOptionsSetup>()); + services.TryAddSingleton(); + services.AddSingleton(); + } + // This is only called if the host didn't already provide an implementation private static IDistributedLockManager Create(IServiceProvider provider) { diff --git a/src/Microsoft.Azure.WebJobs.Host/Constants.cs b/src/Microsoft.Azure.WebJobs.Host/Constants.cs index ca9fad522..568175142 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Constants.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Constants.cs @@ -15,5 +15,6 @@ internal static class Constants public const string AzureWebsiteInstanceId = "WEBSITE_INSTANCE_ID"; public const string AzureWebsiteContainerName = "CONTAINER_NAME"; public const string DateTimeFormatString = "yyyy'-'MM'-'dd'T'HH':'mm':'ss'.'fffK"; + public const string HostingConfigSectionName = "HostingConfig"; } } diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsHostBuilderExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsHostBuilderExtensions.cs index e48884109..c58a13ab9 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsHostBuilderExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsHostBuilderExtensions.cs @@ -4,6 +4,7 @@ using System; using System.Linq; using Microsoft.Azure.WebJobs; +using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration.EnvironmentVariables; @@ -135,6 +136,26 @@ public static IHostBuilder ConfigureWebJobs(this IHostBuilder builder, Action + /// Configures the specified as a scale manager host. This method is used for internal infrastructure only. + /// + /// The to configure. + /// Configuration action to perform as part of service configuration. + /// Configuration action for . + /// The . + public static IHostBuilder ConfigureWebJobsScale(this IHostBuilder builder, + Action configure, + Action configureScaleOptions) + { + builder.ConfigureServices((context, services) => + { + IWebJobsBuilder webJobsBuilder = services.AddWebJobsScale(configureScaleOptions); + configure?.Invoke(context, webJobsBuilder); + }); + + return builder; + } + private static IConfigurationBuilder TryAddDefaultConfigurationSources(this IConfigurationBuilder config) { if (!config.Sources.OfType().Any(p => string.Equals(p.Path, "appsettings.json", StringComparison.OrdinalIgnoreCase))) diff --git a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs index 68624900e..2f2945924 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Hosting/WebJobsServiceCollectionExtensions.cs @@ -32,6 +32,7 @@ public static class WebJobsServiceCollectionExtensions { private const string SingletonConfigSectionName = "Singleton"; private const string ConcurrencyConfigSectionName = "Concurrency"; + private const string ScaleConfigSectionName = "Scale"; /// /// Adds the WebJobs services to the provided . @@ -87,8 +88,9 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); - services.TryAddSingleton(); - services.TryAddSingleton(); + + services.AddCommonScaleServices(); + services.TryAddSingleton(); services.AddSingleton(); services.AddSingleton(); @@ -116,12 +118,7 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio services.AddSingleton(typeof(IWebJobsExtensionConfiguration<>), typeof(WebJobsExtensionConfiguration<>)); // Options logging - services.AddTransient(typeof(OptionsFactory<>)); - services.AddTransient(typeof(IOptionsFactory<>), typeof(WebJobsOptionsFactory<>)); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton, LoggerFilterOptionsFormatter>(); - + services.AddOptionsLogging(); // Concurrency management services.TryAddSingleton(); services.TryAddSingleton(); @@ -159,5 +156,58 @@ public static IWebJobsBuilder AddWebJobs(this IServiceCollection services, Actio return builder; } + + /// + /// Adds the WebJobs scale services to the provided . + /// + /// + /// + public static IWebJobsBuilder AddWebJobsScale(this IServiceCollection services, Action configure) + { + if (services == null) + { + throw new ArgumentNullException(nameof(services)); + } + + services.AddCommonScaleServices(); + services.TryAddSingleton(new PrimaryHostStateProvider() { IsPrimary = true }); + + if (configure != null) + { + services.Configure(configure); + } + + services.AddOptionsLogging(); + + var builder = new WebJobsBuilder(services); + return builder; + } + + /// + /// Adds scale services common to both normal WebJobs and ScaleHost scenarios. + /// + /// + private static void AddCommonScaleServices(this IServiceCollection services) + { + services.AddOptions() + .Configure((options, config) => + { + var section = config.GetWebJobsRootConfiguration().GetSection(ScaleConfigSectionName); + section.Bind(options); + }); + services.TryAddSingleton(); + services.TryAddSingleton(); + services.TryAddSingleton(); + services.AddHostedService(); + } + + private static void AddOptionsLogging(this IServiceCollection services) + { + services.AddTransient(typeof(OptionsFactory<>)); + services.AddTransient(typeof(IOptionsFactory<>), typeof(WebJobsOptionsFactory<>)); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton, LoggerFilterOptionsFormatter>(); + } } } diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/AggregateScaleStatus.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/AggregateScaleStatus.cs new file mode 100644 index 000000000..2f1fb2987 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/AggregateScaleStatus.cs @@ -0,0 +1,33 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Represents the aggregate scale status across all functions being monitored by the host. + /// + public class AggregateScaleStatus + { + /// + /// Gets or sets the aggregate scale vote. + /// + public ScaleVote Vote { get; set; } + + /// + /// Gets or sets the aggregate target worker count. + /// + public int? TargetWorkerCount { get; set; } + + /// + /// Gets or sets the individual s for all functions being monitored. + /// + public IDictionary FunctionScaleStatuses { get; set; } + + /// + /// Gets or sets the individual s for all functions being monitored. + /// + public IDictionary FunctionTargetScalerResults { get; set; } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMetricsRepository.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMetricsRepository.cs new file mode 100644 index 000000000..2f836b1be --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleMetricsRepository.cs @@ -0,0 +1,28 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Interface defining methods for reading/writing scale metrics to a persistent store. + /// + public interface IScaleMetricsRepository + { + /// + /// Persist the metrics for each monitor. + /// + /// The collection of metrics for each monitor. + /// A task. + Task WriteMetricsAsync(IDictionary monitorMetrics); + + /// + /// Read the metrics. + /// + /// The current collection of monitors. + /// Map of metrics per monitor. + Task>> ReadMetricsAsync(IEnumerable monitors); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleStatusProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleStatusProvider.cs new file mode 100644 index 000000000..3a89c04be --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/IScaleStatusProvider.cs @@ -0,0 +1,21 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Interface for providing aggregate scale status across all functions being monitored by the host. + /// + public interface IScaleStatusProvider + { + /// + /// Gets the scale status for all functions being monitored by the host. + /// + /// The . + /// A task that returns the . + Task GetScaleStatusAsync(ScaleStatusContext context); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/NullScaleMetricsRepository.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/NullScaleMetricsRepository.cs new file mode 100644 index 000000000..bba31d29c --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/NullScaleMetricsRepository.cs @@ -0,0 +1,23 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + internal class NullScaleMetricsRepository : IScaleMetricsRepository + { + private IDictionary> _emptyMetrics = new Dictionary>(); + + public Task WriteMetricsAsync(IDictionary monitorMetrics) + { + return Task.CompletedTask; + } + + public Task>> ReadMetricsAsync(IEnumerable monitors) + { + return Task.FromResult((IDictionary>)_emptyMetrics); + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs new file mode 100644 index 000000000..659861328 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleManager.cs @@ -0,0 +1,297 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Manages scale monitoring operations. + /// + internal class ScaleManager : IScaleStatusProvider + { + private readonly IScaleMonitorManager _monitorManager; + private readonly ITargetScalerManager _targetScalerManager; + private readonly IScaleMetricsRepository _metricsRepository; + private readonly IConcurrencyStatusRepository _concurrencyStatusRepository; + private readonly ILogger _logger; + private readonly IConfiguration _configuration; + private IOptions _scaleOptions; + private static HashSet _targetScalersInError = new HashSet(); + + public ScaleManager( + IScaleMonitorManager monitorManager, + ITargetScalerManager targetScalerManager, + IScaleMetricsRepository metricsRepository, + IConcurrencyStatusRepository concurrencyStatusRepository, + IOptions scaleConfiguration, + ILoggerFactory loggerFactory, + IConfiguration configuration) + { + _monitorManager = monitorManager; + _targetScalerManager = targetScalerManager; + _metricsRepository = metricsRepository; + _concurrencyStatusRepository = concurrencyStatusRepository; + _logger = loggerFactory?.CreateLogger(); + _targetScalersInError = new HashSet(); + _scaleOptions = scaleConfiguration; + _configuration = configuration; + } + + // for mock testing only + internal ScaleManager() + { + } + + /// + /// Gets the scale status for all functions being monitored by the host. + /// + /// The . + /// A task that returns the . + public async Task GetScaleStatusAsync(ScaleStatusContext context) + { + var (scaleMonitorsToProcess, targetScalersToProcess) = GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration); + + var scaleStatuses = await GetScaleMonitorsResultAsync(context, scaleMonitorsToProcess); + var targetScalerResults = await GetTargetScalersResultAsync(context, targetScalersToProcess); + + var aggregateScaleStatus = new AggregateScaleStatus + { + Vote = GetAggregateScaleVote(scaleStatuses.Values.Select(x => x.Vote), context, _logger), + TargetWorkerCount = targetScalerResults.Any() ? targetScalerResults.Max(x => x.Value.TargetWorkerCount) : null, + FunctionScaleStatuses = scaleStatuses, + FunctionTargetScalerResults = targetScalerResults + }; + + // Set correct vote if all the triggers are target + if (!scaleStatuses.Any() && aggregateScaleStatus.TargetWorkerCount.HasValue) + { + aggregateScaleStatus.Vote = (ScaleVote)aggregateScaleStatus.TargetWorkerCount.Value.CompareTo(context.WorkerCount); + } + + return aggregateScaleStatus; + } + + private async Task> GetScaleMonitorsResultAsync(ScaleStatusContext context, IEnumerable scaleMonitorsToProcess) + { + Dictionary votes = new Dictionary(); + if (scaleMonitorsToProcess != null && scaleMonitorsToProcess.Any()) + { + // get the collection of current metrics for each monitor + var monitorMetrics = await _metricsRepository.ReadMetricsAsync(scaleMonitorsToProcess); + + _logger.LogDebug($"Computing scale status (WorkerCount={context.WorkerCount})"); + _logger.LogDebug($"{monitorMetrics.Count} scale monitors to sample"); + + // for each monitor, ask it to return its scale status (vote) based on + // the metrics and context info (e.g. worker count) + foreach (var pair in monitorMetrics) + { + var monitor = pair.Key; + var metrics = pair.Value; + + try + { + // create a new context instance to avoid modifying the + // incoming context + var scaleStatusContext = new ScaleStatusContext + { + WorkerCount = context.WorkerCount, + Metrics = metrics + }; + var result = monitor.GetScaleStatus(scaleStatusContext); + + _logger.LogDebug($"Monitor '{monitor.Descriptor.Id}' voted '{result.Vote.ToString()}'"); + string key = monitor.Descriptor.FunctionId ?? monitor.Descriptor.Id; + votes.Add(key, new ScaleStatus() + { + Vote = result.Vote + }); + } + catch (Exception exc) when (!exc.IsFatal()) + { + // if a particular monitor fails, log and continue + _logger.LogError(exc, $"Failed to query scale status for monitor '{monitor.Descriptor.Id}'."); + } + } + } + else + { + // no monitors registered + // this can happen if the host is offline + } + + return votes; + } + + private async Task> GetTargetScalersResultAsync(ScaleStatusContext context, IEnumerable targetScalersToProcess) + { + Dictionary targetScaleVotes = new Dictionary(); + + if (targetScalersToProcess != null && targetScalersToProcess.Any()) + { + _logger.LogDebug($"{targetScalersToProcess.Count()} target scalers to sample"); + HostConcurrencySnapshot snapshot = null; + try + { + snapshot = await _concurrencyStatusRepository.ReadAsync(CancellationToken.None); + } + catch (Exception exc) when (!exc.IsFatal()) + { + _logger.LogError(exc, $"Failed to read concurrency status repository"); + } + + foreach (var targetScaler in targetScalersToProcess) + { + try + { + TargetScalerContext targetScaleStatusContext = new TargetScalerContext(); + if (snapshot != null) + { + if (snapshot.FunctionSnapshots.TryGetValue(targetScaler.TargetScalerDescriptor.FunctionId, out var functionSnapshot)) + { + targetScaleStatusContext.InstanceConcurrency = functionSnapshot.Concurrency; + _logger.LogDebug($"Snapshot dynamic concurrency for target scaler '{targetScaler.TargetScalerDescriptor.FunctionId}' is '{functionSnapshot.Concurrency}'"); + } + } + TargetScalerResult result = null; + try + { + result = await targetScaler.GetScaleResultAsync(targetScaleStatusContext); + } + catch (NotSupportedException ex) + { + string targetScalerUniqueId = GetTargetScalerFunctionUniqueId(targetScaler); + _logger.LogWarning($"Unable to use target based scaling for Function '{targetScaler.TargetScalerDescriptor.FunctionId}'. Metrics monitoring will be used.", ex); + lock (_targetScalersInError) + { + _targetScalersInError.Add(targetScalerUniqueId); + } + + // Adding ScaleVote.None vote + result = new TargetScalerResult + { + TargetWorkerCount = context.WorkerCount + }; + } + _logger.LogDebug($"Target worker count for '{targetScaler.TargetScalerDescriptor.FunctionId}' is '{result.TargetWorkerCount}'"); + + targetScaleVotes.Add(targetScaler.TargetScalerDescriptor.FunctionId, result); + } + catch (Exception exc) when (!exc.IsFatal()) + { + // if a particular target scaler fails, log and continue + _logger.LogError(exc, $"Failed to query scale result for target scaler '{targetScaler.TargetScalerDescriptor.FunctionId}'."); + } + } + } + return targetScaleVotes; + } + + /// + /// Returns scale monitors and target scalers we want to use based on the configuration. + /// Scaler monitor will be ignored if a target scaler is defined in the same extensions assembly and TBS is enabled. + /// + internal static (List, List) GetScalersToSample( + IScaleMonitorManager monitorManager, + ITargetScalerManager targetScalerManager, + IOptions scaleOptions, + IConfiguration configuration) + { + var scaleMonitors = monitorManager.GetMonitors(); + var targetScalers = targetScalerManager.GetTargetScalers(); + + var scaleMonitorsToSample = new List(); + var targetScalersToSample = new List(); + + // Check if TBS enabled on app level + if (scaleOptions.Value.IsTargetScalingEnabled) + { + HashSet targetScalerFunctions = new HashSet(); + foreach (var scaler in targetScalers) + { + string scalerUniqueId = GetTargetScalerFunctionUniqueId(scaler); + if (!_targetScalersInError.Contains(scalerUniqueId)) + { + string assemblyName = GetAssemblyName(scaler.GetType()); + bool featureEnabled = configuration.GetSection(Constants.HostingConfigSectionName).GetValue(assemblyName) == "1"; + if (featureEnabled) + { + targetScalersToSample.Add(scaler); + targetScalerFunctions.Add(scalerUniqueId); + } + } + } + + foreach (var monitor in scaleMonitors) + { + string monitorUniqueId = GetScaleMonitorFunctionUniqueId(monitor); + // Check if target based scaler exists for the function + if (!targetScalerFunctions.Contains(monitorUniqueId)) + { + scaleMonitorsToSample.Add(monitor); + } + } + } + else + { + scaleMonitorsToSample.AddRange(scaleMonitors); + } + + return (scaleMonitorsToSample, targetScalersToSample); + } + + internal static ScaleVote GetAggregateScaleVote(IEnumerable votes, ScaleStatusContext context, ILogger logger) + { + ScaleVote vote = ScaleVote.None; + if (votes.Any()) + { + // aggregate all the votes into a single vote + if (votes.Any(p => p == ScaleVote.ScaleOut)) + { + // scale out if at least 1 monitor requires it + logger?.LogDebug("Scaling out based on votes"); + vote = ScaleVote.ScaleOut; + } + else if (context.WorkerCount > 0 && votes.All(p => p == ScaleVote.ScaleIn)) + { + // scale in only if all monitors vote scale in + logger?.LogDebug("Scaling in based on votes"); + vote = ScaleVote.ScaleIn; + } + } + else if (context.WorkerCount > 0) + { + // if no functions exist or are enabled we'll scale in + logger?.LogDebug("No enabled functions or scale votes so scaling in"); + vote = ScaleVote.ScaleIn; + } + + return vote; + } + + private static string GetTargetScalerFunctionUniqueId(ITargetScaler scaler) + { + return $"{GetAssemblyName(scaler.GetType())}-{scaler.TargetScalerDescriptor.FunctionId}"; + } + + + private static string GetScaleMonitorFunctionUniqueId(IScaleMonitor monitor) + { + return $"{GetAssemblyName(monitor.GetType())}-{monitor.Descriptor.FunctionId}"; + } + + + private static string GetAssemblyName(Type type) + { + return type.Assembly.GetName().Name; + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorManager.cs index 7b61854fe..d6a3ee391 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorManager.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorManager.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System.Collections.Generic; +using System.Linq; namespace Microsoft.Azure.WebJobs.Host.Scale { @@ -14,12 +15,15 @@ public ScaleMonitorManager() { } - public ScaleMonitorManager(IEnumerable monitors) + public ScaleMonitorManager(IEnumerable monitors, IEnumerable monitorProviders) { // add any initial monitors coming from DI // additional monitors can be added at runtime // via Register _monitors.AddRange(monitors); + + // add monitors coming from any registered providers + _monitors.AddRange(monitorProviders.Select(p => p.GetMonitor())); } public void Register(IScaleMonitor monitor) diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs new file mode 100644 index 000000000..ea283eb43 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleMonitorService.cs @@ -0,0 +1,167 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Service responsible for taking periodic scale metrics samples and persisting them. + /// + internal class ScaleMonitorService : IHostedService, IDisposable + { + private readonly Timer _timer; + private readonly IScaleStatusProvider _scaleStausProvider; + private readonly IScaleMetricsRepository _metricsRepository; + private readonly ILogger _logger; + private readonly IOptions _scaleOptions; + private readonly IPrimaryHostStateProvider _primaryHostStateProvider; + private readonly IScaleMonitorManager _monitorManager; + private readonly ITargetScalerManager _targetScalerManager; + private readonly IConfiguration _configuration; + private bool _disposed; + + public ScaleMonitorService( + IScaleStatusProvider scaleStausProvider, + IScaleMetricsRepository metricsRepository, + IOptions scaleOptions, + IPrimaryHostStateProvider primaryHostStateProvider, + IScaleMonitorManager monitorManager, + ITargetScalerManager targetScalerManager, + IConfiguration configuration, + ILoggerFactory loggerFactory) + { + _scaleStausProvider = scaleStausProvider; + _metricsRepository = metricsRepository; + _timer = new Timer(OnTimer, null, Timeout.Infinite, Timeout.Infinite); + _logger = loggerFactory.CreateLogger(); + _scaleOptions = scaleOptions; + _primaryHostStateProvider = primaryHostStateProvider; + _monitorManager = monitorManager; + _targetScalerManager = targetScalerManager; + _configuration = configuration; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + if (_scaleOptions.Value.IsRuntimeScalingEnabled) + { + _logger.LogInformation("Runtime scale monitoring is enabled."); + + // start the timer by setting the due time + SetTimerInterval((int)_scaleOptions.Value.ScaleMetricsSampleInterval.TotalMilliseconds); + } + + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + // stop the timer if it has been started + _timer.Change(Timeout.Infinite, Timeout.Infinite); + + return Task.CompletedTask; + } + + protected async virtual void OnTimer(object state) + { + if (_primaryHostStateProvider.IsPrimary) + { + await TakeMetricsSamplesAsync(); + } + + SetTimerInterval((int)_scaleOptions.Value.ScaleMetricsSampleInterval.TotalMilliseconds); + } + + public void Dispose() + { + Dispose(true); + } + + private async Task TakeMetricsSamplesAsync() + { + try + { + var (scaleMonitorsToProcess, targetScalersToSample) = ScaleManager.GetScalersToSample(_monitorManager, _targetScalerManager, _scaleOptions, _configuration); + + if (scaleMonitorsToProcess.Any()) + { + _logger.LogDebug($"Taking metrics samples for {scaleMonitorsToProcess.Count()} monitor(s)."); + + var metricsMap = new Dictionary(); + foreach (var monitor in scaleMonitorsToProcess) + { + ScaleMetrics metrics = null; + try + { + // take a metrics sample for each monitor + metrics = await monitor.GetMetricsAsync(); + metricsMap[monitor] = metrics; + + // log the metrics json to provide visibility into monitor activity + var json = JsonConvert.SerializeObject(metrics); + _logger.LogDebug($"Scale metrics sample for monitor '{monitor.Descriptor.Id}': {json}"); + } + catch (Exception exc) when (!exc.IsFatal()) + { + // if a particular monitor fails, log and continue + _logger.LogError(exc, $"Failed to collect scale metrics sample for monitor '{monitor.Descriptor.Id}'."); + } + } + + if (metricsMap.Count > 0) + { + // persist the metrics samples + await _metricsRepository.WriteMetricsAsync(metricsMap); + } + } + } + catch (Exception exc) when (!exc.IsFatal()) + { + _logger.LogError(exc, "Failed to collect/persist scale metrics."); + } + } + + private void SetTimerInterval(int dueTime) + { + if (!_disposed) + { + var timer = _timer; + if (timer != null) + { + try + { + _timer.Change(dueTime, Timeout.Infinite); + } + catch (ObjectDisposedException) + { + // might race with dispose + } + } + } + } + + private void Dispose(bool disposing) + { + if (!_disposed) + { + if (disposing) + { + _timer.Dispose(); + } + + _disposed = true; + } + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleOptions.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleOptions.cs new file mode 100644 index 000000000..7a9ff185c --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/ScaleOptions.cs @@ -0,0 +1,109 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using Microsoft.Azure.WebJobs.Hosting; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Options used to configure scale monitoring. + /// + public class ScaleOptions : IOptionsFormatter + { + private TimeSpan _scaleMetricsMaxAge; + private TimeSpan _scaleMetricsSampleInterval; + + public ScaleOptions() + { + // At the default values, a single monitor will be generating 6 samples per minute + // so at 2 minutes that's 12 samples + // Assume a case of 100 functions in an app, each mapping to a monitor. Thats + // 1200 samples to read from storage on each scale status request. + ScaleMetricsMaxAge = TimeSpan.FromMinutes(2); + ScaleMetricsSampleInterval = TimeSpan.FromSeconds(10); + MetricsPurgeEnabled = true; + } + + /// + /// Gets or sets a value indicating the maximum age for metrics. + /// Metrics that exceed this age will not be returned to monitors. + /// + public TimeSpan ScaleMetricsMaxAge + { + get + { + return _scaleMetricsMaxAge; + } + + set + { + if (value < TimeSpan.FromMinutes(1) || value > TimeSpan.FromMinutes(5)) + { + throw new ArgumentOutOfRangeException(nameof(ScaleMetricsMaxAge)); + } + _scaleMetricsMaxAge = value; + } + } + + /// + /// Gets or sets the sampling interval for metrics. + /// + public TimeSpan ScaleMetricsSampleInterval + { + get + { + return _scaleMetricsSampleInterval; + } + + set + { + if (value < TimeSpan.FromSeconds(1) || value > TimeSpan.FromSeconds(30)) + { + throw new ArgumentOutOfRangeException(nameof(ScaleMetricsSampleInterval)); + } + _scaleMetricsSampleInterval = value; + } + } + + /// + /// Gets or sets a value indicating whether old metrics data + /// will be auto purged. + /// + public bool MetricsPurgeEnabled { get; set; } + + /// + /// Gets or sets a value indicating whether target base scaling is enabled at the host level. + /// + public bool IsTargetScalingEnabled { get; set; } + + /// + /// Gets or sets a value indicating whether runtime scale monitoring is enabled at the host level. + /// + public bool IsRuntimeScalingEnabled { get; set; } + + public string Format() + { + // only log options if scaling is enabled + if (IsRuntimeScalingEnabled || IsTargetScalingEnabled) + { + var options = new JObject + { + { nameof(ScaleMetricsMaxAge), ScaleMetricsMaxAge }, + { nameof(ScaleMetricsSampleInterval), ScaleMetricsSampleInterval }, + { nameof(MetricsPurgeEnabled), MetricsPurgeEnabled }, + { nameof(IsTargetScalingEnabled), IsTargetScalingEnabled }, + { nameof(IsRuntimeScalingEnabled), IsRuntimeScalingEnabled } + }; + + return options.ToString(Formatting.Indented); + } + else + { + return null; + } + } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerManager.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerManager.cs index 35ea88062..c4fc11e8a 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerManager.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/TargetScalerManager.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System.Collections.Generic; +using System.Linq; namespace Microsoft.Azure.WebJobs.Host.Scale { @@ -14,12 +15,15 @@ public TargetScalerManager() { } - public TargetScalerManager(IEnumerable targetScalers) + public TargetScalerManager(IEnumerable targetScalers, IEnumerable scalerProviders) { // add any initial target scalers coming from DI // additional monitors can be added at runtime // via Register _targetScalers.AddRange(targetScalers); + + // add scalers coming from any registered providers + _targetScalers.AddRange(scalerProviders.Select(p => p.GetTargetScaler())); } public void Register(ITargetScaler targetScaler) diff --git a/src/Microsoft.Azure.WebJobs.Host/Scale/TriggerMetadata.cs b/src/Microsoft.Azure.WebJobs.Host/Scale/TriggerMetadata.cs new file mode 100644 index 000000000..d875b49cf --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Scale/TriggerMetadata.cs @@ -0,0 +1,52 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using Newtonsoft.Json.Linq; + +namespace Microsoft.Azure.WebJobs.Host.Scale +{ + /// + /// Represents trigger metadata. + /// + public class TriggerMetadata + { + private readonly string _functionName; + private readonly string _type; + + public TriggerMetadata(JObject metadata) + : this(metadata, new Dictionary(StringComparer.OrdinalIgnoreCase)) + { + } + + public TriggerMetadata(JObject metadata, IDictionary properties) + { + Metadata = metadata ?? throw new ArgumentNullException(nameof(metadata)); + Properties = properties ?? new Dictionary(StringComparer.OrdinalIgnoreCase); + + _functionName = Metadata.GetValue("functionName", StringComparison.OrdinalIgnoreCase)?.Value(); + _type = Metadata.GetValue("type", StringComparison.OrdinalIgnoreCase)?.Value(); + } + + /// + /// Gets the name of the Function this trigger belongs to. + /// + public string FunctionName => _functionName; + + /// + /// Gets the type of the trigger. + /// + public string Type => _type; + + /// + /// Gets all the properties tagged to this instance. + /// + public IDictionary Properties { get; } + + /// + /// Gets the trigger metadata. + /// + public JObject Metadata { get; } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/AsyncChainEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/AsyncChainEndToEndTests.cs index d98f789f8..830f628b8 100644 --- a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/AsyncChainEndToEndTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/AsyncChainEndToEndTests.cs @@ -150,7 +150,7 @@ public async Task AsyncChainEndToEnd() "Executed 'AsyncChainEndToEndTests.QueueToQueueAsync' (Succeeded, Id=", $"Executing 'AsyncChainEndToEndTests.QueueToBlobAsync' (Reason='New queue message detected on '{secondQueueName}'.', Id=", "Executed 'AsyncChainEndToEndTests.QueueToBlobAsync' (Succeeded, Id=", - $"Executing 'AsyncChainEndToEndTests.BlobToBlobAsync' (Reason='New blob detected: {blobContainerName}/Blob1', Id=", + $"Executing 'AsyncChainEndToEndTests.BlobToBlobAsync' (Reason='New blob detected(LogsAndContainerScan): {blobContainerName}/Blob1', Id=", "Executed 'AsyncChainEndToEndTests.BlobToBlobAsync' (Succeeded, Id=", "Job host stopped", "Executing 'AsyncChainEndToEndTests.ReadResultBlob' (Reason='This function was programmatically called via the host APIs.', Id=", diff --git a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/Scale/ScaleHostEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/Scale/ScaleHostEndToEndTests.cs new file mode 100644 index 000000000..ddd585cfb --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/Scale/ScaleHostEndToEndTests.cs @@ -0,0 +1,375 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json.Linq; +using Xunit; +using static Microsoft.Azure.WebJobs.Host.EndToEndTests.Scale.ScaleHostEndToEndTests; + +namespace Microsoft.Azure.WebJobs.Host.EndToEndTests.Scale +{ + [Trait(TestTraits.CategoryTraitName, TestTraits.ScaleMonitoring)] + public class ScaleHostEndToEndTests + { + private const string Function1Name = "Function1"; + private const string Function2Name = "Function2"; + private const string Function3Name = "Function3"; + + private const string ATriggerType = "testExtensionATrigger"; + private const string BTriggerType = "testExtensionBTrigger"; + + [Theory] + [InlineData(false)] + [InlineData(true)] + public async Task ScaleManager_GetScaleStatusAsync_ReturnsExpected(bool tbsEnabled) + { + var triggerMetadata = new List() + { + new TriggerMetadata(new JObject { { "functionName", $"{Function1Name}" }, { "type", $"{ATriggerType}" } }), + new TriggerMetadata(new JObject { { "functionName", $"{Function2Name}" }, { "type", $"{ATriggerType}" } }), + new TriggerMetadata(new JObject { { "functionName", $"{Function3Name}" }, { "type", $"{BTriggerType}" } }, new Dictionary { { "foo", "bar" } }) + }; + + string hostJson = + @"{ + ""extensions"": { + ""testExtensionA"" : { + ""foo"": 16 + }, + ""testExtensionB"" : { + ""bar"": 8 + } + } + }"; + + string hostId = "test-host"; + var loggerProvider = new TestLoggerProvider(); + + IHostBuilder hostBuilder = new HostBuilder(); + hostBuilder.ConfigureLogging(configure => + { + configure.SetMinimumLevel(LogLevel.Debug); + configure.AddProvider(loggerProvider); + }); + hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) => + { + // Adding host.json here + config.AddJsonStream(new MemoryStream(Encoding.UTF8.GetBytes(hostJson))); + + // Adding app settings + config.AddInMemoryCollection(new Dictionary + { + { "app_setting1", "value1" }, + { "app_setting2", "value2"}, + { $"{Constants.HostingConfigSectionName}:Microsoft.Azure.WebJobs.Host.EndToEndTests", "1" } + }); + }) + .ConfigureServices(services => + { + services.AddAzureClientsCore(); + services.AddAzureStorageScaleServices(); + + services.AddSingleton(); + services.AddSingleton(); + }) + .ConfigureWebJobsScale((context, builder) => + { + builder.UseHostId(hostId); + }, + scaleOptions => + { + scaleOptions.IsTargetScalingEnabled = tbsEnabled; + scaleOptions.MetricsPurgeEnabled = false; + scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4); + scaleOptions.IsRuntimeScalingEnabled = true; + }); + + foreach (TriggerMetadata t in triggerMetadata) + { + if (t.Type == ATriggerType) + { + hostBuilder.ConfigureTestExtensionAScale(t); + } + else + { + hostBuilder.ConfigureTestExtensionBScale(t); + } + } + + IHost scaleHost = hostBuilder.Build(); + await scaleHost.StartAsync(); + + IHostedService scaleMonitorService = scaleHost.Services.GetService(); + Assert.NotNull(scaleMonitorService); + + var concurrencyStatusRepositories = scaleHost.Services.GetServices().ToList(); + Assert.True(concurrencyStatusRepositories.Count == 2); + // Validate that internal BlobStorageConcurrencyStatusRepository is available + Assert.True(concurrencyStatusRepositories.SingleOrDefault(x => x.GetType().Name == "BlobStorageConcurrencyStatusRepository") != null); + Assert.True(concurrencyStatusRepositories.SingleOrDefault(x => x is TestConcurrencyStatusRepository) != null); + + // Validate IConfiguration + var section = scaleHost.Services.GetService().GetSection(Constants.HostingConfigSectionName); + Assert.False(section.GetValue("sovemalue") == "1"); + Assert.True(section.GetValue("Microsoft.Azure.WebJobs.Host.EndToEndTests") == "1"); + Assert.True(section.GetValue("microsoft.azure.webJobs.host.endtoendtests") == "1"); + + await TestHelpers.Await(async () => + { + IScaleStatusProvider scaleManager = scaleHost.Services.GetService(); + + var scaleStatus = await scaleManager.GetScaleStatusAsync(new ScaleStatusContext()); + + bool scaledOut = false; + if (!tbsEnabled) + { + scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == null && scaleStatus.FunctionTargetScalerResults.Count == 0 + && scaleStatus.FunctionScaleStatuses[Function1Name].Vote == ScaleVote.ScaleOut + && scaleStatus.FunctionScaleStatuses[Function2Name].Vote == ScaleVote.ScaleOut + && scaleStatus.FunctionScaleStatuses[Function3Name].Vote == ScaleVote.ScaleOut; + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains(logMessages, p => p.Contains("3 scale monitors to sample")); + } + } + else + { + scaledOut = scaleStatus.Vote == ScaleVote.ScaleOut && scaleStatus.TargetWorkerCount == 4 && scaleStatus.FunctionScaleStatuses.Count == 0 + && scaleStatus.FunctionTargetScalerResults[Function1Name].TargetWorkerCount == 2 + && scaleStatus.FunctionTargetScalerResults[Function2Name].TargetWorkerCount == 3 + && scaleStatus.FunctionTargetScalerResults[Function3Name].TargetWorkerCount == 4; + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains(logMessages, p => p.Contains("3 target scalers to sample")); + } + } + + if (scaledOut) + { + var logMessages = loggerProvider.GetAllLogMessages().Select(p => p.FormattedMessage).ToArray(); + Assert.Contains(logMessages, p => p.Contains("Runtime scale monitoring is enabled.")); + if (!tbsEnabled) + { + Assert.Contains(logMessages, p => p.Contains("Scaling out based on votes")); + } + } + + return scaledOut; + }, pollingInterval: 1000); + } + + private class TestConcurrencyStatusRepository : IConcurrencyStatusRepository + { + public TestConcurrencyStatusRepository() + { + } + + public Task WriteAsync(HostConcurrencySnapshot snapshot, CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task ReadAsync(CancellationToken cancellationToken) + { + return Task.FromResult(new HostConcurrencySnapshot() + { + FunctionSnapshots = new Dictionary() + { + { Function1Name, new FunctionConcurrencySnapshot() { Concurrency = 1 } }, + { Function2Name, new FunctionConcurrencySnapshot() { Concurrency = 1 } } + } + }); + } + } + + private class TestScaleMetricsRepository : IScaleMetricsRepository + { + private IDictionary _cache; + + public Task WriteMetricsAsync(IDictionary monitorMetrics) + { + _cache = monitorMetrics; + return Task.CompletedTask; + } + + public Task>> ReadMetricsAsync(IEnumerable monitors) + { + IDictionary> result = new Dictionary>(); + if (_cache != null) + { + foreach (var pair in _cache) + { + result[pair.Key] = new List(); + } + } + + return Task.FromResult(result); + } + } + + internal abstract class TestExtensionScalerProvider : IScaleMonitorProvider, ITargetScalerProvider + { + private IOptions _scaleOptions; + private IScaleMonitor _scaleMonitor; + private ITargetScaler _targetScaler; + private TriggerMetadata _triggerMetadata; + + public TestExtensionScalerProvider(IConfiguration config, IOptions scaleOptions, TriggerMetadata triggerMetadata) + { + Assert.Equal(scaleOptions.Value.ScaleMetricsMaxAge, TimeSpan.FromMinutes(4)); + + _scaleOptions = scaleOptions; + _triggerMetadata = triggerMetadata; + } + + public IScaleMonitor GetMonitor() + { + if (_scaleMonitor == null) + { + _scaleMonitor = new TestScaleMonitor(_triggerMetadata.FunctionName, _triggerMetadata.FunctionName); + } + return _scaleMonitor; + } + + public ITargetScaler GetTargetScaler() + { + if (_targetScaler == null) + { + _targetScaler = new TestTargetScaler(_triggerMetadata.FunctionName); + } + return _targetScaler; + } + } + } + public static class TestExtensionAHostBuilderExtensions + { + public static IHostBuilder ConfigureTestExtensionAScale(this IHostBuilder builder, TriggerMetadata triggerMetadata) + { + builder.ConfigureServices((context, services) => + { + services.AddSingleton(serviceProvider => + { + IConfiguration config = serviceProvider.GetService(); + IOptions scaleOptions = serviceProvider.GetService>(); + return new TestExtensionAScalerProvider(config, scaleOptions, triggerMetadata); + }); + services.AddSingleton(serviceProvider => + { + IConfiguration config = serviceProvider.GetService(); + IOptions scaleOptions = serviceProvider.GetService>(); + return new TestExtensionAScalerProvider(config, scaleOptions, triggerMetadata); + }); + }); + + return builder; + } + + private class TestExtensionAScalerProvider : TestExtensionScalerProvider + { + public TestExtensionAScalerProvider(IConfiguration config, IOptions scaleOptions, TriggerMetadata triggerMetadata) + : base(config, scaleOptions, triggerMetadata) + { + // verify we can access configuration settings + var appSetting = config.GetValue("app_setting1"); + Assert.NotNull(appSetting); + var hostJsonSetting = config.GetValue("extensions:testExtensionA:foo"); + Assert.NotNull(hostJsonSetting); + } + } + } + + public static class TestExtensionBHostBuilderExtensions + { + public static IHostBuilder ConfigureTestExtensionBScale(this IHostBuilder builder, TriggerMetadata triggerMetadata) + { + builder.ConfigureServices((context, services) => + { + services.AddSingleton(serviceProvider => + { + IConfiguration config = serviceProvider.GetService(); + IOptions scaleOptions = serviceProvider.GetService>(); + return new TestExtensionBScalerProvider(config, scaleOptions, triggerMetadata); + }); + services.AddSingleton(serviceProvider => + { + IConfiguration config = serviceProvider.GetService(); + IOptions scaleOptions = serviceProvider.GetService>(); + return new TestExtensionBScalerProvider(config, scaleOptions, triggerMetadata); + }); + }); + + return builder; + } + + private class TestExtensionBScalerProvider : TestExtensionScalerProvider + { + public TestExtensionBScalerProvider(IConfiguration config, IOptions scaleOptions, TriggerMetadata triggerMetadata) + : base(config, scaleOptions, triggerMetadata) + { + } + } + } + + internal class TestScaleMonitor : IScaleMonitor + { + public ScaleMonitorDescriptor Descriptor { get; set; } + + public TestScaleMonitor(string id, string functionId) + { + Descriptor = new ScaleMonitorDescriptor(id, functionId); + } + + public Task GetMetricsAsync() + { + return Task.FromResult(new ScaleMetrics()); + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + int targetWorkerCount = (int)Char.GetNumericValue(Descriptor.FunctionId[Descriptor.FunctionId.Length - 1]); + + return new ScaleStatus() + { + Vote = ScaleVote.ScaleOut + }; + } + } + + internal class TestTargetScaler : ITargetScaler + { + public TargetScalerDescriptor TargetScalerDescriptor { get; set; } + + public TestTargetScaler(string functionId) + { + TargetScalerDescriptor = new TargetScalerDescriptor(functionId); + } + + public Task GetScaleResultAsync(TargetScalerContext context) + { + int targetWorkerCount = (int)Char.GetNumericValue(TargetScalerDescriptor.FunctionId[TargetScalerDescriptor.FunctionId.Length - 1]) + 1; + + return Task.FromResult(new TargetScalerResult() + { + TargetWorkerCount = targetWorkerCount + }); + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/WebJobs.Host.EndToEndTests.csproj b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/WebJobs.Host.EndToEndTests.csproj index 9526df709..4d7005fb7 100644 --- a/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/WebJobs.Host.EndToEndTests.csproj +++ b/test/Microsoft.Azure.WebJobs.Host.EndToEndTests/WebJobs.Host.EndToEndTests.csproj @@ -24,17 +24,17 @@ - + all - + - + diff --git a/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/WebJobs.Host.FunctionalTests.csproj b/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/WebJobs.Host.FunctionalTests.csproj index 3ee61aad5..5624800dd 100644 --- a/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/WebJobs.Host.FunctionalTests.csproj +++ b/test/Microsoft.Azure.WebJobs.Host.FunctionalTests/WebJobs.Host.FunctionalTests.csproj @@ -31,7 +31,7 @@ - + diff --git a/test/Microsoft.Azure.WebJobs.Host.TestCommon/TestTraits.cs b/test/Microsoft.Azure.WebJobs.Host.TestCommon/TestTraits.cs index 56bf5dc76..19dc9d38d 100644 --- a/test/Microsoft.Azure.WebJobs.Host.TestCommon/TestTraits.cs +++ b/test/Microsoft.Azure.WebJobs.Host.TestCommon/TestTraits.cs @@ -8,5 +8,7 @@ public static class TestTraits public const string CategoryTraitName = "Category"; public const string DynamicConcurrency = "DynamicConcurrency"; + + public const string ScaleMonitoring = "ScaleMonitoring"; } } diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs index 438e31804..d52ed7f71 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs @@ -14,6 +14,7 @@ namespace Microsoft.Azure.WebJobs.Host.UnitTests /// fail any time new dependencies or public surface area are added, ensuring /// we review such additions carefully. /// + [Trait(TestTraits.CategoryTraitName, TestTraits.ScaleMonitoring)] public class PublicSurfaceTests { [Fact] @@ -304,7 +305,12 @@ public void WebJobs_Host_VerifyPublicSurfaceArea() "ITargetScalerProvider", "TargetScalerDescriptor", "TargetScalerResult", - "TargetScalerContext" + "TargetScalerContext", + "IScaleMetricsRepository", + "IScaleStatusProvider", + "ScaleOptions", + "TriggerMetadata", + "AggregateScaleStatus" }; TestHelpers.AssertPublicTypes(expected, assembly); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs new file mode 100644 index 000000000..caf8fc3d2 --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleManagerTests.cs @@ -0,0 +1,398 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + [Trait(TestTraits.CategoryTraitName, TestTraits.ScaleMonitoring)] + public class ScaleManagerTests + { + private readonly Mock _monitorManagerMock; + private readonly Mock _metricsRepositoryMock; + private readonly Mock _targetScalerManagerMock; + private readonly Mock _concurrencyStatusRepositoryMock; + private readonly ILoggerFactory _loggerFactory; + private readonly TestLoggerProvider _loggerProvider; + private readonly List _monitors; + private readonly List _targetScalers; + private readonly ILogger _testLogger; + private readonly IOptions _scaleOptions; + private readonly IConfiguration _configuration; + private readonly HashSet _targetScalersInError; + + public ScaleManagerTests() + { + _monitors = new List(); + _targetScalers = new List(); + _loggerProvider = new TestLoggerProvider(); + _loggerFactory = new LoggerFactory(); + _loggerFactory.AddProvider(_loggerProvider); + _testLogger = _loggerFactory.CreateLogger(); + + _monitorManagerMock = new Mock(MockBehavior.Strict); + _monitorManagerMock.Setup(p => p.GetMonitors()).Returns(() => _monitors); + _targetScalerManagerMock = new Mock(MockBehavior.Strict); + _targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(() => _targetScalers); + _metricsRepositoryMock = new Mock(MockBehavior.Strict); + _metricsRepositoryMock.Setup(x => x.ReadMetricsAsync(It.IsAny>())).ReturnsAsync(new Dictionary>()); + _concurrencyStatusRepositoryMock = new Mock(MockBehavior.Strict); + _concurrencyStatusRepositoryMock.Setup(p => p.ReadAsync(It.IsAny())).ReturnsAsync( + new HostConcurrencySnapshot() + { + FunctionSnapshots = new Dictionary() + { + { "func1", new FunctionConcurrencySnapshot() { Concurrency = 1 } } + } + }); + _scaleOptions = Options.Create(new ScaleOptions() + { + IsTargetScalingEnabled = true, + ScaleMetricsSampleInterval = TimeSpan.FromSeconds(10) + }); + + _configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary { { $"{Constants.HostingConfigSectionName}:Microsoft.Azure.WebJobs.Host.UnitTests", "1" } }).Build(); + + _targetScalersInError = new HashSet(); + } + + [Theory] + [InlineData(0, ScaleVote.None)] + [InlineData(1, ScaleVote.ScaleIn)] + public async Task GetScaleStatus_NoMonitors_ReturnsExpectedStatus(int workerCount, ScaleVote expected) + { + var context = new ScaleStatusContext + { + WorkerCount = workerCount + }; + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _scaleOptions, _loggerFactory, _configuration); + var status = await scaleManager.GetScaleStatusAsync(context); + + Assert.Equal(expected, status.Vote); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task GetScaleStatus_ReturnsExpectedResult(bool tbsEnabled) + { + var context = new ScaleStatusContext + { + WorkerCount = 3 + }; + + var monitor1 = new TestScaleMonitor("func1-test-test", "func1"); + monitor1.Status = new ScaleStatus + { + Vote = ScaleVote.ScaleIn + }; + var monitor2 = new TestScaleMonitor1(); + monitor2.Status = new ScaleStatus + { + Vote = ScaleVote.ScaleOut + }; + var monitor3 = new TestScaleMonitor2(); + monitor3.Status = new ScaleStatus + { + Vote = ScaleVote.ScaleIn + }; + + List monitors = new List + { + monitor1 + }; + if (!tbsEnabled) + { + monitors.Add(monitor2); + monitors.Add(monitor3); + } + _monitorManagerMock.Setup(p => p.GetMonitors()).Returns(monitors); + + var monitorMetrics = new Dictionary> + { + { monitor1, new List() }, + { monitor2, new List() }, + { monitor3, new List() } + }; + _metricsRepositoryMock.Setup(p => p.ReadMetricsAsync(It.IsAny>())).ReturnsAsync(monitorMetrics); + + var targetScaler1 = new TestTargetScaler() + { + Result = new TargetScalerResult() + { + TargetWorkerCount = 2 + }, + TargetScalerDescriptor = new TargetScalerDescriptor("func1") + }; + + _targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(new List { targetScaler1 }); + + IOptions options = Options.Create(new ScaleOptions + { + IsTargetScalingEnabled = tbsEnabled, + }); + + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory, _configuration); + + var status = await scaleManager.GetScaleStatusAsync(context); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + if (!tbsEnabled) + { + Assert.Equal("Computing scale status (WorkerCount=3)", logs[0].FormattedMessage); + Assert.Equal("3 scale monitors to sample", logs[1].FormattedMessage); + Assert.Equal("Monitor 'func1-test-test' voted 'ScaleIn'", logs[2].FormattedMessage); + Assert.Equal("Monitor 'testscalemonitor1' voted 'ScaleOut'", logs[3].FormattedMessage); + Assert.Equal("Monitor 'testscalemonitor2' voted 'ScaleIn'", logs[4].FormattedMessage); + Assert.Equal(ScaleVote.ScaleOut, status.Vote); + Assert.Equal(null, status.TargetWorkerCount); + } + else + { + Assert.Equal("1 target scalers to sample", logs[0].FormattedMessage); + Assert.Equal("Snapshot dynamic concurrency for target scaler 'func1' is '1'", logs[1].FormattedMessage); + Assert.Equal("Target worker count for 'func1' is '2'", logs[2].FormattedMessage); + Assert.Equal(ScaleVote.ScaleIn, status.Vote); + Assert.Equal(2, status.TargetWorkerCount); + } + } + + [Fact] + public async Task GetScaleStatus_MonitorFails_ReturnsExpectedResult() + { + var context = new ScaleStatusContext + { + WorkerCount = 3 + }; + + var mockMonitor1 = new Mock(MockBehavior.Strict); + mockMonitor1.Setup(p => p.GetScaleStatus(It.Is(q => q.WorkerCount == context.WorkerCount))).Returns(new ScaleStatus { Vote = ScaleVote.ScaleIn }); + mockMonitor1.SetupGet(p => p.Descriptor).Returns(new ScaleMonitorDescriptor("testscalemonitor1")); + var mockMonitor2 = new Mock(MockBehavior.Strict); + mockMonitor2.SetupGet(p => p.Descriptor).Returns(new ScaleMonitorDescriptor("testscalemonitor2")); + var exception = new Exception("Kaboom!"); + mockMonitor2.Setup(p => p.GetScaleStatus(It.Is(q => q.WorkerCount == context.WorkerCount))).Throws(exception); + var mockMonitor3 = new Mock(MockBehavior.Strict); + mockMonitor3.Setup(p => p.GetScaleStatus(It.Is(q => q.WorkerCount == context.WorkerCount))).Returns(new ScaleStatus { Vote = ScaleVote.ScaleIn }); + mockMonitor3.SetupGet(p => p.Descriptor).Returns(new ScaleMonitorDescriptor("testscalemonitor3")); + List monitors = new List + { + mockMonitor1.Object, + mockMonitor2.Object, + mockMonitor3.Object + }; + + _monitorManagerMock.Setup(p => p.GetMonitors()).Returns(monitors); + + var monitorMetrics = new Dictionary> + { + { mockMonitor1.Object, new List() }, + { mockMonitor2.Object, new List() }, + { mockMonitor3.Object, new List() } + }; + _metricsRepositoryMock.Setup(p => p.ReadMetricsAsync(It.IsAny>())).ReturnsAsync(monitorMetrics); + + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, _scaleOptions, _loggerFactory, _configuration); + var status = await scaleManager.GetScaleStatusAsync(context); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + Assert.Equal("Computing scale status (WorkerCount=3)", logs[0].FormattedMessage); + Assert.Equal("3 scale monitors to sample", logs[1].FormattedMessage); + Assert.Equal("Monitor 'testscalemonitor1' voted 'ScaleIn'", logs[2].FormattedMessage); + Assert.Equal("Failed to query scale status for monitor 'testscalemonitor2'.", logs[3].FormattedMessage); + Assert.Same(exception, logs[3].Exception); + Assert.Equal("Monitor 'testscalemonitor3' voted 'ScaleIn'", logs[4].FormattedMessage); + + Assert.Equal(null, status.TargetWorkerCount); + Assert.Equal(ScaleVote.ScaleIn, status.Vote); + } + + [Fact] + public async Task GetScaleStatus_TargetScalerFails_ReturnsExpectedResult() + { + var context = new ScaleStatusContext + { + WorkerCount = 3 + }; + + var targetScaler1 = new TestTargetScaler { Result = new TargetScalerResult { TargetWorkerCount = 3 }, TargetScalerDescriptor = new TargetScalerDescriptor("func1") }; + var targetScaler2 = new TestTargetScaler2 { Result = new TargetScalerResult { TargetWorkerCount = 1 }, TargetScalerDescriptor = new TargetScalerDescriptor("func2") }; + var targetScaler3 = new TestTargetScaler { Result = new TargetScalerResult { TargetWorkerCount = -3 }, TargetScalerDescriptor = new TargetScalerDescriptor("func3") }; + List targetScalers = new List + { + targetScaler1, + targetScaler2, + targetScaler3 + }; + _targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(targetScalers); + + IOptions options = Options.Create(new ScaleOptions + { + IsTargetScalingEnabled = true, + }); + + ScaleManager scaleManager = new ScaleManager(_monitorManagerMock.Object, _targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory, _configuration); + + var status = await scaleManager.GetScaleStatusAsync(context); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + Assert.Equal("3 target scalers to sample", logs[0].FormattedMessage); + Assert.Equal($"Snapshot dynamic concurrency for target scaler 'func1' is '1'", logs[1].FormattedMessage); + Assert.Equal("Target worker count for 'func1' is '3'", logs[2].FormattedMessage); + Assert.Equal("Failed to query scale result for target scaler 'func2'.", logs[3].FormattedMessage); + Assert.Same("test", logs[3].Exception.Message); + Assert.Equal("Target worker count for 'func3' is '-3'", logs[4].FormattedMessage); + + Assert.Equal(3, status.TargetWorkerCount); + Assert.Equal(ScaleVote.None, status.Vote); + } + + [Theory] + [InlineData(0, 0, 0, ScaleVote.None)] + [InlineData(1, 0, 0, ScaleVote.ScaleIn)] + [InlineData(1, 1, 3, ScaleVote.ScaleOut)] + [InlineData(0, 0, 1, ScaleVote.None)] + [InlineData(1, 0, 1, ScaleVote.ScaleIn)] + [InlineData(5, 0, 3, ScaleVote.ScaleIn)] + public void GetAggregateScaleVote_ReturnsExpectedResult(int workerCount, int numScaleOutVotes, int numScaleInVotes, ScaleVote expected) + { + var context = new ScaleStatusContext + { + WorkerCount = workerCount + }; + List votes = new List(); + for (int i = 0; i < numScaleOutVotes; i++) + { + votes.Add(ScaleVote.ScaleOut); + } + for (int i = 0; i < numScaleInVotes; i++) + { + votes.Add(ScaleVote.ScaleIn); + } + var vote = ScaleManager.GetAggregateScaleVote(votes, context, _testLogger); + Assert.Equal(expected, vote); + } + + [Theory] + [InlineData(false, true, 1, 0)] + [InlineData(true, false, 1, 0)] + [InlineData(true, true, 0, 1)] + public void GetScalersToSample_Returns_Expected(bool targetBaseScalingEnabled, bool triggerEnabled, int expectedScaleMonitorCount, int expectedTargetScalerCount) + { + List scaleMonitors = new List + { + new TestScaleMonitor("func1-test-test", "func1"), + }; + Mock scaleMonitorManagerMock = new Mock(MockBehavior.Strict); + scaleMonitorManagerMock.Setup(x => x.GetMonitors()).Returns(scaleMonitors); + + List targetScalers = new List + { + new TestTargetScaler() + { + TargetScalerDescriptor = new TargetScalerDescriptor("func1") + } + }; + Mock targetScalerManagerMock = new Mock(MockBehavior.Strict); + targetScalerManagerMock.Setup(x => x.GetTargetScalers()).Returns(targetScalers); + + IOptions options = Options.Create(new ScaleOptions + { + IsTargetScalingEnabled = targetBaseScalingEnabled, + }); + + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary { { $"{Constants.HostingConfigSectionName}:Microsoft.Azure.WebJobs.Host.UnitTests", triggerEnabled ? "1" : "0" } }).Build(); + + + var (scaleMonitorsToProcess, targetScalesToProcess) = ScaleManager.GetScalersToSample( + scaleMonitorManagerMock.Object, + targetScalerManagerMock.Object, + options, + configuration + ); + + Assert.Equal(scaleMonitorsToProcess.Count(), expectedScaleMonitorCount); + Assert.Equal(targetScalesToProcess.Count(), expectedTargetScalerCount); + } + + [Fact] + public async Task GetScalersToSample_FallsBackToMonitor_OnTargetScalerError() + { + List scaleMonitors = new List + { + new TestScaleMonitor("function1-test-test", "function1") + }; + Mock scaleMonitorManagerMock = new Mock(MockBehavior.Strict); + scaleMonitorManagerMock.Setup(x => x.GetMonitors()).Returns(scaleMonitors); + + List targetScalers = new List + { + new FaultyTargetScaler() + { + TargetScalerDescriptor = new TargetScalerDescriptor("function1") + }, + new TestTargetScaler2() + { + TargetScalerDescriptor = new TargetScalerDescriptor("function2") + } + }; + Mock targetScalerManagerMock = new Mock(MockBehavior.Strict); + targetScalerManagerMock.Setup(x => x.GetTargetScalers()).Returns(targetScalers); + + var context = new ScaleStatusContext() + { + WorkerCount = 1 + }; + + IOptions options = Options.Create(new ScaleOptions + { + IsTargetScalingEnabled = true, + }); + + ScaleManager scaleManager = new ScaleManager(scaleMonitorManagerMock.Object, targetScalerManagerMock.Object, _metricsRepositoryMock.Object, _concurrencyStatusRepositoryMock.Object, options, _loggerFactory, _configuration); + + var (monitors1, scalers1) = ScaleManager.GetScalersToSample( //Col1 + scaleMonitorManagerMock.Object, + targetScalerManagerMock.Object, + _scaleOptions, + _configuration + ); + Assert.Equal(monitors1.Count(), 0); + Assert.Equal(scalers1.Count(), 2); + AggregateScaleStatus resutl1 = await scaleManager.GetScaleStatusAsync(context); // Col2 + + Assert.Equal(resutl1.TargetWorkerCount, 1); + Assert.Equal(resutl1.Vote, ScaleVote.None); + var logs = _loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage).ToArray(); + Assert.Single(logs.Where(x => x == "Unable to use target based scaling for Function 'function1'. Metrics monitoring will be used.")); + _loggerProvider.ClearAllLogMessages(); + + var (monitors2, scalers2) = ScaleManager.GetScalersToSample( + scaleMonitorManagerMock.Object, + targetScalerManagerMock.Object, + _scaleOptions, + _configuration + ); + Assert.Equal(monitors2.Count(), 1); + Assert.Equal(scalers2.Count(), 1); + AggregateScaleStatus resutl2 = await scaleManager.GetScaleStatusAsync(context); + Assert.Equal(resutl2.TargetWorkerCount, null); + Assert.Equal(resutl2.Vote, ScaleVote.ScaleIn); + logs = _loggerProvider.GetAllLogMessages().Select(x => x.FormattedMessage).ToArray(); + Assert.Empty(logs.Where(x => x == "Unable to use target based scaling for Function 'function1'. Metrics monitoring will be used.")); + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleMonitorServiceTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleMonitorServiceTests.cs new file mode 100644 index 000000000..c97f3cacd --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/ScaleMonitorServiceTests.cs @@ -0,0 +1,254 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Azure.WebJobs.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + [Trait(TestTraits.CategoryTraitName, TestTraits.ScaleMonitoring)] + public class ScaleMonitorServiceTests + { + private readonly ScaleMonitorService _monitor; + private readonly TestMetricsRepository _metricsRepository; + private readonly TestLoggerProvider _loggerProvider; + private readonly PrimaryHostStateProvider _primaryHostStateProvider; + private readonly Mock _monitorManagerMock; + private readonly Mock _targetScalerManagerMock; + private readonly IConfiguration _configuration; + private List _monitors; + private List _scalers; + + public ScaleMonitorServiceTests() + { + _monitors = new List(); + _scalers = new List(); + + _metricsRepository = new TestMetricsRepository(); + _loggerProvider = new TestLoggerProvider(); + ILoggerFactory loggerFactory = new LoggerFactory(); + loggerFactory.AddProvider(_loggerProvider); + + Mock functionsScaleManagerMock = new Mock(); + _monitorManagerMock = new Mock(MockBehavior.Strict); + _monitorManagerMock.Setup(p => p.GetMonitors()).Returns(() => _monitors); + _targetScalerManagerMock = new Mock(MockBehavior.Strict); + _targetScalerManagerMock.Setup(p => p.GetTargetScalers()).Returns(() => _scalers); + _configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary { { $"{Constants.HostingConfigSectionName}:Microsoft.Azure.WebJobs.Host.UnitTests", "1" } }).Build(); + + IOptions options = Options.Create(new ScaleOptions() + { + ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1), + IsRuntimeScalingEnabled = true + }); + + _primaryHostStateProvider = new PrimaryHostStateProvider() { IsPrimary = true }; + + _monitor = new ScaleMonitorService(functionsScaleManagerMock.Object, + _metricsRepository, + options, + _primaryHostStateProvider, + _monitorManagerMock.Object, + _targetScalerManagerMock.Object, + _configuration, + loggerFactory); + } + + [Fact] + public async Task OnTimer_ExceptionsAreHandled() + { + var monitor = new TestScaleMonitor1 + { + Exception = new Exception("Kaboom!") + }; + _monitors.Add(monitor); + + await _monitor.StartAsync(CancellationToken.None); + + // wait for a few failures to happen + LogMessage[] logs = null; + await TestHelpers.Await(() => + { + logs = _loggerProvider.GetAllLogMessages().Where(p => p.Level == LogLevel.Error).ToArray(); + return logs.Length >= 3; + }); + + Assert.All(logs, + p => + { + Assert.Same(monitor.Exception, p.Exception); + Assert.Equal("Failed to collect scale metrics sample for monitor 'testscalemonitor1'.", p.FormattedMessage); + }); + } + + [Fact] + public async Task OnTimer_DoesNotSample_WhenNotPrimaryHost() + { + _primaryHostStateProvider.IsPrimary = false; + + var monitor = new TestScaleMonitor1(); + _monitors.Add(monitor); + + await _monitor.StartAsync(CancellationToken.None); + + await Task.Delay(100); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + Assert.Single(logs); + Assert.StartsWith("Runtime scale monitoring is enabled.", logs[0].FormattedMessage); + } + + [Fact] + public async Task OnTimer_Sample_WhenPrimaryHost() + { + var monitor = new TestScaleMonitor1(); + _monitors.Add(monitor); + + await _monitor.StartAsync(CancellationToken.None); + + await Task.Delay(2000); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + Assert.StartsWith("Runtime scale monitoring is enabled.", logs[0].FormattedMessage); + Assert.Equal($"Taking metrics samples for 1 monitor(s).", logs[1].FormattedMessage); + } + + [Fact] + public async Task OnTimer_PersistsMetrics() + { + var testMetrics = new List + { + new TestScaleMetrics1 { Count = 10 }, + new TestScaleMetrics1 { Count = 15 }, + new TestScaleMetrics1 { Count = 45 }, + new TestScaleMetrics1 { Count = 50 }, + new TestScaleMetrics1 { Count = 100 } + }; + var monitor1 = new TestScaleMonitor1 + { + Metrics = testMetrics + }; + _monitors.Add(monitor1); + + await _monitor.StartAsync(CancellationToken.None); + + await TestHelpers.Await(() => + { + return _metricsRepository.Count >= 5; + }); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + + var infoLogs = logs.Where(p => p.Level == LogLevel.Information); + Assert.StartsWith("Runtime scale monitoring is enabled.", logs[0].FormattedMessage); + Assert.Equal("Taking metrics samples for 1 monitor(s).", logs[1].FormattedMessage); + Assert.True(logs[2].FormattedMessage.StartsWith("Scale metrics sample for monitor 'testscalemonitor1': {\"Count\":10,")); + + var metricsWritten = _metricsRepository.Metrics[monitor1].Take(5); + Assert.Equal(testMetrics, metricsWritten); + } + + [Fact] + public async Task OnTimer_MonitorFailuresAreHandled() + { + var testMetrics1 = new List + { + new TestScaleMetrics1 { Count = 10 }, + new TestScaleMetrics1 { Count = 15 }, + new TestScaleMetrics1 { Count = 45 }, + new TestScaleMetrics1 { Count = 50 }, + new TestScaleMetrics1 { Count = 100 } + }; + var monitor1 = new TestScaleMonitor1 + { + Exception = new Exception("Kaboom!") + }; + _monitors.Add(monitor1); + + var testMetrics2 = new List + { + new TestScaleMetrics2 { Num = 300 }, + new TestScaleMetrics2 { Num = 350 }, + new TestScaleMetrics2 { Num = 400 }, + new TestScaleMetrics2 { Num = 450 }, + new TestScaleMetrics2 { Num = 500 } + }; + var monitor2 = new TestScaleMonitor2 + { + Metrics = testMetrics2 + }; + _monitors.Add(monitor2); + + await _monitor.StartAsync(CancellationToken.None); + + await TestHelpers.Await(() => + { + return _metricsRepository.Count >= 5; + }); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + + var infoLogs = logs.Where(p => p.Level == LogLevel.Information); + Assert.StartsWith("Runtime scale monitoring is enabled.", logs[0].FormattedMessage); + Assert.Equal("Taking metrics samples for 2 monitor(s).", logs[1].FormattedMessage); + + // verify the failure logs for the failing monitor + Assert.True(logs.Count(p => p.FormattedMessage.Equals($"Failed to collect scale metrics sample for monitor 'testscalemonitor1'.")) >= 5); + + // verify each successful sample is logged + Assert.True(logs.Count(p => p.FormattedMessage.StartsWith($"Scale metrics sample for monitor 'testscalemonitor2'")) >= 5); + + var metricsWritten = _metricsRepository.Metrics[monitor2].Take(5); + Assert.Equal(testMetrics2, metricsWritten); + } + } + + public class TestMetricsRepository : IScaleMetricsRepository + { + private int _count; + + public TestMetricsRepository() + { + _count = 0; + Metrics = new Dictionary>(); + } + + public int Count => _count; + + public IDictionary> Metrics { get; set; } + + public Task>> ReadMetricsAsync(IEnumerable monitors) + { + return Task.FromResult>>(Metrics); + } + + public Task WriteMetricsAsync(IDictionary monitorMetrics) + { + foreach (var pair in monitorMetrics) + { + if (!Metrics.ContainsKey(pair.Key)) + { + Metrics[pair.Key] = new List(); + } + + Metrics[pair.Key].Add(pair.Value); + + Interlocked.Increment(ref _count); + } + + return Task.CompletedTask; + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestScaleMonitor.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestScaleMonitor.cs new file mode 100644 index 000000000..6bd6f2e19 --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestScaleMonitor.cs @@ -0,0 +1,114 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + public class TestScaleMonitor : IScaleMonitor where TMetrics : ScaleMetrics + { + private readonly ScaleMonitorDescriptor _descriptor; + + public TestScaleMonitor() + { + Index = 0; + _descriptor = new ScaleMonitorDescriptor(GetType().Name.ToLower()); + } + + public TestScaleMonitor(string descriptorId, string functionId) + { + Index = 0; + _descriptor = new ScaleMonitorDescriptor(descriptorId, functionId); + } + + public ScaleMonitorDescriptor Descriptor => _descriptor; + + public Exception Exception { get; set; } + + public int Index { get; set; } + + public List Metrics { get; set; } + + public ScaleStatus Status { get; set; } + + Task IScaleMonitor.GetMetricsAsync() + { + if (Exception != null) + { + throw Exception; + } + + return Task.FromResult(Metrics[Index++ % Metrics.Count]); + } + + public async Task GetMetricsAsync() + { + return (TMetrics)(await GetMetricsAsync()); + } + + ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) + { + return Status; + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + return ((IScaleMonitor)this).GetScaleStatus(context); + } + } + + public class TestScaleMonitor1 : TestScaleMonitor + { + } + + public class TestScaleMetrics1 : ScaleMetrics + { + public long Count { get; set; } + } + + public class TestScaleMonitor2 : TestScaleMonitor + { + } + + public class TestScaleMetrics2 : ScaleMetrics + { + public int Num { get; set; } + } + + public class TestScaleMonitor3 : TestScaleMonitor + { + } + + public class TestScaleMetrics3 : ScaleMetrics + { + public TimeSpan TimeSpan { get; set; } + + public int Length { get; set; } + } + + public class TestInvalidScaleMonitor : IScaleMonitor + { + private readonly ScaleMonitorDescriptor _descriptor; + + public TestInvalidScaleMonitor() + { + _descriptor = new ScaleMonitorDescriptor("testmonitor-invalid"); + } + + public ScaleMonitorDescriptor Descriptor => _descriptor; + + public Task GetMetricsAsync() + { + throw new NotImplementedException(); + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + throw new NotImplementedException(); + } + } +} diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestTargetScaler.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestTargetScaler.cs new file mode 100644 index 000000000..ebf6a425c --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Scale/TestTargetScaler.cs @@ -0,0 +1,37 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Scale; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Scale +{ + internal class TestTargetScaler : ITargetScaler + { + public TargetScalerDescriptor TargetScalerDescriptor { get; set; } + + public TargetScalerResult Result { get; set; } + + public virtual Task GetScaleResultAsync(TargetScalerContext context) + { + return Task.FromResult(Result); + } + } + + internal class TestTargetScaler2 : TestTargetScaler + { + public override Task GetScaleResultAsync(TargetScalerContext context) + { + throw new Exception("test"); + } + } + + internal class FaultyTargetScaler : TestTargetScaler + { + public override Task GetScaleResultAsync(TargetScalerContext context) + { + throw new NotSupportedException("test"); + } + } +}