diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListener.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListener.cs index 9778626b8a0e..33a1879cbfd3 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListener.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/BlobListener.cs @@ -13,7 +13,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners { - internal sealed class BlobListener : IListener, IScaleMonitorProvider + internal sealed class BlobListener : IListener, IScaleMonitorProvider, ITargetScalerProvider { private readonly ISharedListener _sharedListener; private readonly ILogger _logger; @@ -113,5 +113,10 @@ public IScaleMonitor GetMonitor() // so the shared queue won't be monitored. return ((IScaleMonitorProvider)_sharedListener).GetMonitor(); } + + public ITargetScaler GetTargetScaler() + { + return ((ITargetScalerProvider)_sharedListener).GetTargetScaler(); + } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobQueueListener.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobQueueListener.cs index 7abc0021ef1e..c8a1931fb4e8 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobQueueListener.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Listeners/SharedBlobQueueListener.cs @@ -10,7 +10,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.Listeners { - internal sealed class SharedBlobQueueListener : ISharedListener, IScaleMonitorProvider + internal sealed class SharedBlobQueueListener : ISharedListener, IScaleMonitorProvider, ITargetScalerProvider { private readonly IListener _listener; private readonly BlobQueueTriggerExecutor _executor; @@ -73,7 +73,12 @@ public void Dispose() public IScaleMonitor GetMonitor() { - return (IScaleMonitor)_listener; + return ((IScaleMonitorProvider)_listener).GetMonitor(); + } + + public ITargetScaler GetTargetScaler() + { + return ((ITargetScalerProvider)_listener).GetTargetScaler(); } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.csproj b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.csproj index 6b7017d4195c..205287dba8f9 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.csproj +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/src/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs.csproj @@ -1,4 +1,4 @@ - + $(RequiredTargetFrameworks) diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobListenerTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobListenerTests.cs index c721cee959bb..9ace6f6ecd6d 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobListenerTests.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Blobs/tests/Listeners/BlobListenerTests.cs @@ -14,6 +14,7 @@ public class BlobListenerTests public void GetMonitor_ReturnsSharedMonitor() { var queueListener = new QueueListener(); + var watcherMock = new Mock(MockBehavior.Strict); var executor = new BlobQueueTriggerExecutor(BlobTriggerSource.LogsAndContainerScan, watcherMock.Object, NullLogger.Instance); var sharedBlobQueueListener = new SharedBlobQueueListener(queueListener, executor); @@ -25,7 +26,13 @@ public void GetMonitor_ReturnsSharedMonitor() var monitor2 = blobListener1.GetMonitor(); Assert.AreSame(monitor1, monitor2); - Assert.AreSame(monitor1, queueListener); + Assert.AreSame(monitor1, queueListener.GetMonitor()); + + var targetScaler1 = blobListener1.GetTargetScaler(); + var targetScaler2 = blobListener1.GetTargetScaler(); + + Assert.AreSame(targetScaler1, targetScaler2); + Assert.AreSame(targetScaler1, queueListener.GetTargetScaler()); } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueListener.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueListener.cs index 3e12bf1d2c2b..cafb68763ff4 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueListener.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueListener.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; using System.Diagnostics; -using System.Globalization; using System.Linq; using System.Runtime.ExceptionServices; using System.Threading; @@ -25,10 +24,8 @@ namespace Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners { - internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INotificationCommand, IScaleMonitor + internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INotificationCommand, ITargetScalerProvider, IScaleMonitorProvider { - private const int NumberOfSamplesToConsider = 5; - private readonly ITaskSeriesTimer _timer; private readonly IDelayStrategy _delayStrategy; private readonly QueueClient _queue; @@ -44,8 +41,9 @@ internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INo private readonly ILogger _logger; private readonly FunctionDescriptor _functionDescriptor; private readonly string _functionId; - private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; private readonly CancellationTokenSource _shutdownCancellationTokenSource; + private readonly Lazy _targetScaler; + private readonly Lazy _scaleMonitor; private bool? _queueExists; private bool _foundMessageSinceLastDelay; @@ -57,6 +55,8 @@ internal sealed partial class QueueListener : IListener, ITaskSeriesCommand, INo // for mock testing only internal QueueListener() { + _scaleMonitor = new Lazy(() => new QueueScaleMonitor()); + _targetScaler = new Lazy(() => new QueueTargetScaler()); } public QueueListener(QueueClient queue, @@ -130,10 +130,19 @@ public QueueListener(QueueClient queue, _delayStrategy = new RandomizedExponentialBackoffStrategy(QueuePollingIntervals.Minimum, maximumInterval); - _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-QueueTrigger-{_queue.Name}".ToLower(CultureInfo.InvariantCulture), _functionId); _shutdownCancellationTokenSource = new CancellationTokenSource(); _concurrencyManager = concurrencyManager; + + _targetScaler = new Lazy( + () => new QueueTargetScaler( + _functionId, + queue, + queueOptions, + loggerFactory + )); + + _scaleMonitor = new Lazy(() => new QueueScaleMonitor(_functionId, _queue, loggerFactory)); } // for testing @@ -444,185 +453,14 @@ internal static void RegisterSharedWatcherWithQueueProcessor(QueueProcessor queu } } - public ScaleMonitorDescriptor Descriptor - { - get - { - return _scaleMonitorDescriptor; - } - } - - async Task IScaleMonitor.GetMetricsAsync() - { - return await GetMetricsAsync().ConfigureAwait(false); - } - - public async Task GetMetricsAsync() - { - int queueLength = 0; - TimeSpan queueTime = TimeSpan.Zero; - - try - { - QueueProperties queueProperties = await _queue.GetPropertiesAsync().ConfigureAwait(false); - queueLength = queueProperties.ApproximateMessagesCount; - - if (queueLength > 0) - { - PeekedMessage message = (await _queue.PeekMessagesAsync(1).ConfigureAwait(false)).Value.FirstOrDefault(); - if (message != null) - { - if (message.InsertedOn.HasValue) - { - queueTime = DateTime.UtcNow.Subtract(message.InsertedOn.Value.DateTime); - } - } - else - { - // ApproximateMessageCount often returns a stale value, - // especially when the queue is empty. - queueLength = 0; - } - } - } - catch (RequestFailedException ex) - { - if (ex.IsNotFoundQueueNotFound() || - ex.IsConflictQueueBeingDeletedOrDisabled() || - ex.IsServerSideError()) - { - // ignore transient errors, and return default metrics - // E.g. if the queue doesn't exist, we'll return a zero queue length - // and scale in - _logger.LogWarning($"Error querying for queue scale status: {ex.Message}"); - } - } - - return new QueueTriggerMetrics - { - QueueLength = queueLength, - QueueTime = queueTime, - Timestamp = DateTime.UtcNow - }; - } - - ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) - { - return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); - } - - public ScaleStatus GetScaleStatus(ScaleStatusContext context) + public ITargetScaler GetTargetScaler() { - return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray()); + return _targetScaler.Value; } - private ScaleStatus GetScaleStatusCore(int workerCount, QueueTriggerMetrics[] metrics) + public IScaleMonitor GetMonitor() { - ScaleStatus status = new ScaleStatus - { - Vote = ScaleVote.None - }; - - // verify we have enough samples to make a scale decision. - if (metrics == null || (metrics.Length < NumberOfSamplesToConsider)) - { - return status; - } - - // Maintain a minimum ratio of 1 worker per 1,000 queue messages. - long latestQueueLength = metrics.Last().QueueLength; - if (latestQueueLength > workerCount * 1000) - { - status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation($"QueueLength ({latestQueueLength}) > workerCount ({workerCount}) * 1,000"); - _logger.LogInformation($"Length of queue ({_queue.Name}, {latestQueueLength}) is too high relative to the number of instances ({workerCount})."); - return status; - } - - // Check to see if the queue has been empty for a while. - bool queueIsIdle = metrics.All(p => p.QueueLength == 0); - if (queueIsIdle) - { - status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation($"Queue '{_queue.Name}' is idle"); - return status; - } - - // Samples are in chronological order. Check for a continuous increase in time or length. - // If detected, this results in an automatic scale out. - if (metrics[0].QueueLength > 0) - { - bool queueLengthIncreasing = - IsTrueForLastN( - metrics, - NumberOfSamplesToConsider, - (prev, next) => prev.QueueLength < next.QueueLength); - if (queueLengthIncreasing) - { - status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation($"Queue length is increasing for '{_queue.Name}'"); - return status; - } - } - - if (metrics[0].QueueTime > TimeSpan.Zero && metrics[0].QueueTime < metrics[NumberOfSamplesToConsider - 1].QueueTime) - { - bool queueTimeIncreasing = - IsTrueForLastN( - metrics, - NumberOfSamplesToConsider, - (prev, next) => prev.QueueTime <= next.QueueTime); - if (queueTimeIncreasing) - { - status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation($"Queue time is increasing for '{_queue.Name}'"); - return status; - } - } - - bool queueLengthDecreasing = - IsTrueForLastN( - metrics, - NumberOfSamplesToConsider, - (prev, next) => prev.QueueLength > next.QueueLength); - if (queueLengthDecreasing) - { - status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation($"Queue length is decreasing for '{_queue.Name}'"); - return status; - } - - bool queueTimeDecreasing = IsTrueForLastN( - metrics, - NumberOfSamplesToConsider, - (prev, next) => prev.QueueTime > next.QueueTime); - if (queueTimeDecreasing) - { - status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation($"Queue time is decreasing for '{_queue.Name}'"); - return status; - } - - _logger.LogInformation($"Queue '{_queue.Name}' is steady"); - - return status; - } - - private static bool IsTrueForLastN(IList samples, int count, Func predicate) - { - Debug.Assert(count > 1, "count must be greater than 1."); - Debug.Assert(count <= samples.Count, "count must be less than or equal to the list size."); - - // Walks through the list from left to right starting at len(samples) - count. - for (int i = samples.Count - count; i < samples.Count - 1; i++) - { - if (!predicate(samples[i], samples[i + 1])) - { - return false; - } - } - - return true; + return _scaleMonitor.Value; } } } diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueMetricsProvider.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueMetricsProvider.cs new file mode 100644 index 000000000000..269f2111a0ae --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueMetricsProvider.cs @@ -0,0 +1,111 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +using System; +using System.Linq; +using System.Threading.Tasks; +using Azure; +using Azure.Storage.Queues; +using Azure.Storage.Queues.Models; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners +{ + /// + /// Provides QueueTriggerMetrics from a specific queue entity. + /// + internal class QueueMetricsProvider + { + private readonly QueueClient _queue; + private readonly ILogger _logger; + + /// + /// Instantiates a QueueMetricsProvider. + /// + /// The QueueClient to use for metrics polling. + /// Used to create an ILogger instance. + public QueueMetricsProvider(QueueClient queue, ILoggerFactory loggerFactory) + { + _queue = queue; + _logger = loggerFactory.CreateLogger(); + } + + /// + /// Retrieve queue length from the specified queue entity. + /// + /// The queue length from the associated queue entity. + public async Task GetQueueLengthAsync() + { + try + { + QueueProperties queueProperties = await _queue.GetPropertiesAsync().ConfigureAwait(false); + return queueProperties.ApproximateMessagesCount; + } + catch (RequestFailedException ex) + { + if (ex.IsNotFoundQueueNotFound() || + ex.IsConflictQueueBeingDeletedOrDisabled() || + ex.IsServerSideError()) + { + // ignore transient errors, and return default metrics + // E.g. if the queue doesn't exist, we'll return a zero queue length + // and scale in + _logger.LogWarning($"Error querying for queue scale status: {ex.Message}"); + } + } + return 0; + } + + /// + /// Retrieves metrics from the queue entity. + /// + /// Returns a object. + public async Task GetMetricsAsync() + { + int queueLength = 0; + TimeSpan queueTime = TimeSpan.Zero; + + try + { + QueueProperties queueProperties = await _queue.GetPropertiesAsync().ConfigureAwait(false); + queueLength = queueProperties.ApproximateMessagesCount; + + if (queueLength > 0) + { + PeekedMessage message = (await _queue.PeekMessagesAsync(1).ConfigureAwait(false)).Value.FirstOrDefault(); + if (message != null) + { + if (message.InsertedOn.HasValue) + { + queueTime = DateTime.UtcNow.Subtract(message.InsertedOn.Value.DateTime); + } + } + else + { + // ApproximateMessageCount often returns a stale value, + // especially when the queue is empty. + queueLength = 0; + } + } + } + catch (RequestFailedException ex) + { + if (ex.IsNotFoundQueueNotFound() || + ex.IsConflictQueueBeingDeletedOrDisabled() || + ex.IsServerSideError()) + { + // ignore transient errors, and return default metrics + // E.g. if the queue doesn't exist, we'll return a zero queue length + // and scale in + _logger.LogWarning($"Error querying for queue scale status: {ex.Message}"); + } + } + + return new QueueTriggerMetrics + { + QueueLength = queueLength, + QueueTime = queueTime, + Timestamp = DateTime.UtcNow + }; + } + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueScaleMonitor.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueScaleMonitor.cs new file mode 100644 index 000000000000..813256a1d78b --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueScaleMonitor.cs @@ -0,0 +1,196 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Linq; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners +{ + /// + /// Used to retrieve metrics and make scale decisions for Queues. + /// + internal class QueueScaleMonitor : IScaleMonitor + { + private const int NumberOfSamplesToConsider = 5; + + private QueueClient _queue; + private ILogger _logger; + private ScaleMonitorDescriptor _scaleMonitorDescriptor; + private QueueMetricsProvider _queueMetricsProvider; + + // mock testing only + internal QueueScaleMonitor() + { + } + + /// + /// Public constructor used to instantiate a QueueScaleMonitor for retrieving metrics and making scale decisions. + /// + /// The function name to make scale decisions for. + /// The queue client to poll metrics from. + /// Used to create an ILogger instance. + public QueueScaleMonitor(string functionId, QueueClient queue, ILoggerFactory loggerFactory) + { + _queue = queue; + _logger = loggerFactory.CreateLogger(); + _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{functionId}-QueueTrigger-{_queue.Name}".ToLower(CultureInfo.InvariantCulture), functionId); + _queueMetricsProvider = new QueueMetricsProvider(queue, loggerFactory); + } + + /// + /// Retrieves the descriptor of the QueueScaleMonitor with functionId. + /// + public ScaleMonitorDescriptor Descriptor + { + get + { + return _scaleMonitorDescriptor; + } + } + + async Task IScaleMonitor.GetMetricsAsync() + { + return await GetMetricsAsync().ConfigureAwait(false); + } + + /// + /// Retrieves metrics from the queueClient. + /// + /// + public async Task GetMetricsAsync() + { + return await _queueMetricsProvider.GetMetricsAsync().ConfigureAwait(false); + } + + ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) + { + return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); + } + + /// + /// Using a ScaleStatusContext, return an incremental scale decision. + /// + /// The ScaleStatusContext containing the worker count and trigger metrics to make scale decisions for. + /// Returns a vote to scale out, maintain state, or scale in. + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray()); + } + + private ScaleStatus GetScaleStatusCore(int workerCount, QueueTriggerMetrics[] metrics) + { + ScaleStatus status = new ScaleStatus + { + Vote = ScaleVote.None + }; + + // verify we have enough samples to make a scale decision. + if (metrics == null || (metrics.Length < NumberOfSamplesToConsider)) + { + return status; + } + + // Maintain a minimum ratio of 1 worker per 1,000 queue messages. + long latestQueueLength = metrics.Last().QueueLength; + if (latestQueueLength > workerCount * 1000) + { + status.Vote = ScaleVote.ScaleOut; + _logger.LogInformation($"QueueLength ({latestQueueLength}) > workerCount ({workerCount}) * 1,000"); + _logger.LogInformation($"Length of queue ({_queue.Name}, {latestQueueLength}) is too high relative to the number of instances ({workerCount})."); + return status; + } + + // Check to see if the queue has been empty for a while. + bool queueIsIdle = metrics.All(p => p.QueueLength == 0); + if (queueIsIdle) + { + status.Vote = ScaleVote.ScaleIn; + _logger.LogInformation($"Queue '{_queue.Name}' is idle"); + return status; + } + + // Samples are in chronological order. Check for a continuous increase in time or length. + // If detected, this results in an automatic scale out. + if (metrics[0].QueueLength > 0) + { + bool queueLengthIncreasing = + IsTrueForLastN( + metrics, + NumberOfSamplesToConsider, + (prev, next) => prev.QueueLength < next.QueueLength); + if (queueLengthIncreasing) + { + status.Vote = ScaleVote.ScaleOut; + _logger.LogInformation($"Queue length is increasing for '{_queue.Name}'"); + return status; + } + } + + if (metrics[0].QueueTime > TimeSpan.Zero && metrics[0].QueueTime < metrics[NumberOfSamplesToConsider - 1].QueueTime) + { + bool queueTimeIncreasing = + IsTrueForLastN( + metrics, + NumberOfSamplesToConsider, + (prev, next) => prev.QueueTime <= next.QueueTime); + if (queueTimeIncreasing) + { + status.Vote = ScaleVote.ScaleOut; + _logger.LogInformation($"Queue time is increasing for '{_queue.Name}'"); + return status; + } + } + + bool queueLengthDecreasing = + IsTrueForLastN( + metrics, + NumberOfSamplesToConsider, + (prev, next) => prev.QueueLength > next.QueueLength); + if (queueLengthDecreasing) + { + status.Vote = ScaleVote.ScaleIn; + _logger.LogInformation($"Queue length is decreasing for '{_queue.Name}'"); + return status; + } + + bool queueTimeDecreasing = IsTrueForLastN( + metrics, + NumberOfSamplesToConsider, + (prev, next) => prev.QueueTime > next.QueueTime); + if (queueTimeDecreasing) + { + status.Vote = ScaleVote.ScaleIn; + _logger.LogInformation($"Queue time is decreasing for '{_queue.Name}'"); + return status; + } + + _logger.LogInformation($"Queue '{_queue.Name}' is steady"); + + return status; + } + + private static bool IsTrueForLastN(IList samples, int count, Func predicate) + { + Debug.Assert(count > 1, "count must be greater than 1."); + Debug.Assert(count <= samples.Count, "count must be less than or equal to the list size."); + + // Walks through the list from left to right starting at len(samples) - count. + for (int i = samples.Count - count; i < samples.Count - 1; i++) + { + if (!predicate(samples[i], samples[i + 1])) + { + return false; + } + } + + return true; + } + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueTargetScaler.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueTargetScaler.cs new file mode 100644 index 000000000000..9b30b2914d2b --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Common/src/Shared/Queues/QueueTargetScaler.cs @@ -0,0 +1,74 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +using Azure; +using Azure.Storage.Queues; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; +using System; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners +{ + /// + /// Provides queue length metrics for the . + /// + internal sealed class QueueTargetScaler : ITargetScaler + { + private readonly string _functionId; + private readonly string _queueName; + private readonly QueueMetricsProvider _queueMetricsProvider; + private readonly TargetScalerDescriptor _targetScalerDescriptor; + private QueuesOptions _options; + private readonly ILogger _logger; + + // internal mock testing only + internal QueueTargetScaler() + { + } + + /// + /// Constructs a new instance. + /// + public TargetScalerDescriptor TargetScalerDescriptor => _targetScalerDescriptor; + + internal QueueTargetScaler(string functionId, QueueClient queueClient, QueuesOptions options, ILoggerFactory loggerFactory) + { + _functionId = functionId; + _queueName = queueClient.Name; + _queueMetricsProvider = new QueueMetricsProvider(queueClient, loggerFactory); + _targetScalerDescriptor = new TargetScalerDescriptor(functionId); + _options = options; + _logger = loggerFactory.CreateLogger(); + } + + /// + /// Makes a target scale decision based on most recent metrics for the specified queue. + /// + /// The TargetScalerContext, which contains the InstanceConcurrency, or the targetMetric used in target based scaling. + /// Returns a TargetScalerResult with a TargetWorkerCount. + public async Task GetScaleResultAsync(TargetScalerContext context) + { + int queueLength = await _queueMetricsProvider.GetQueueLengthAsync().ConfigureAwait(false); + return GetScaleResultInternal(context, queueLength); + } + + internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, int queueLength) + { + int concurrency = !context.InstanceConcurrency.HasValue ? _options.BatchSize : context.InstanceConcurrency.Value; + + if (concurrency < 0) + { + throw new ArgumentOutOfRangeException($"Concurrency value='{concurrency}' used for target based scale must be > 0"); + } + + int targetWorkerCount = (int)Math.Ceiling(queueLength / (decimal)concurrency); + + _logger.LogInformation($"'Target worker count for function '{_functionId}' is '{targetWorkerCount}' (QueueName='{_queueName}', QueueLength ='{queueLength}', Concurrency='{concurrency}')."); + return new TargetScalerResult + { + TargetWorkerCount = targetWorkerCount + }; + } + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueListenerTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueListenerTests.cs index fd4e3cfd5c31..2de9c863ac96 100644 --- a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueListenerTests.cs +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueListenerTests.cs @@ -14,7 +14,6 @@ using Microsoft.Azure.WebJobs.Extensions.Storage.Common; using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests; -using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Timers; using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Listeners; using Microsoft.Azure.WebJobs.Host; using Microsoft.Azure.WebJobs.Host.Executors; @@ -37,6 +36,7 @@ public class QueueListenerTests : LiveTestBase private const string TestFunctionId = "TestFunction"; private Mock _mockQueue; private QueueListener _listener; + private QueueScaleMonitor _scaleMonitor; private Mock _mockQueueProcessor; private Mock> _mockTriggerExecutor; private Mock _mockExceptionDispatcher; @@ -78,6 +78,7 @@ public void SetUp() var concurrencyManagerMock = new Mock(MockBehavior.Strict); _listener = new QueueListener(_mockQueue.Object, null, _mockTriggerExecutor.Object, _mockExceptionDispatcher.Object, _loggerFactory, null, _queuesOptions, _mockQueueProcessor.Object, new FunctionDescriptor { Id = TestFunctionId }, concurrencyManagerMock.Object); + _scaleMonitor = new QueueScaleMonitor(TestFunctionId, _mockQueue.Object, _loggerFactory); _queueMessage = QueuesModelFactory.QueueMessage("TestId", "TestPopReceipt", "TestMessage", 0); } @@ -113,64 +114,7 @@ public void GetMessageReceiveCount_ReturnsExpectedValue() [Test] public void ScaleMonitor_Id_ReturnsExpectedValue() { - Assert.AreEqual("testfunction-queuetrigger-testqueue", _listener.Descriptor.Id); - } - - [Test] - public async Task GetMetrics_ReturnsExpectedResult() - { - var queuesOptions = new QueuesOptions(); - Mock> mockTriggerExecutor = new Mock>(MockBehavior.Strict); - var mockConcurrencyManager = new Mock(MockBehavior.Strict); - var queueProcessorFactory = new DefaultQueueProcessorFactory(); - var queueProcessor = QueueListenerFactory.CreateQueueProcessor(Fixture.Queue, null, _loggerFactory, queueProcessorFactory, queuesOptions, null); - QueueListener listener = new QueueListener(Fixture.Queue, null, mockTriggerExecutor.Object, new WebJobsExceptionHandler(null), - _loggerFactory, null, queuesOptions, queueProcessor, new FunctionDescriptor { Id = "TestFunction" }, mockConcurrencyManager.Object); - - var metrics = await listener.GetMetricsAsync(); - - Assert.AreEqual(0, metrics.QueueLength); - Assert.AreEqual(TimeSpan.Zero, metrics.QueueTime); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - // add some test messages - for (int i = 0; i < 5; i++) - { - await Fixture.Queue.SendMessageAsync($"Message {i}"); - } - - await Task.Delay(TimeSpan.FromSeconds(5)); - - metrics = await listener.GetMetricsAsync(); - - Assert.AreEqual(5, metrics.QueueLength); - Assert.True(metrics.QueueTime.Ticks > 0); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - // verify non-generic interface works as expected - metrics = (QueueTriggerMetrics)(await ((IScaleMonitor)listener).GetMetricsAsync()); - Assert.AreEqual(5, metrics.QueueLength); - } - - [Test] - public async Task GetMetrics_HandlesStorageExceptions() - { - var exception = new RequestFailedException( - 500, - "Things are very wrong.", - default, - new Exception()); - - _mockQueue.Setup(p => p.GetPropertiesAsync(It.IsAny())).Throws(exception); - - var metrics = await _listener.GetMetricsAsync(); - - Assert.AreEqual(0, metrics.QueueLength); - Assert.AreEqual(TimeSpan.Zero, metrics.QueueTime); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - var warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); - Assert.AreEqual("Error querying for queue scale status: Things are very wrong.", warning.FormattedMessage); + Assert.AreEqual("testfunction-queuetrigger-testqueue", _scaleMonitor.Descriptor.Id); } [Test] @@ -181,11 +125,11 @@ public void GetScaleStatus_NoMetrics_ReturnsVote_None() WorkerCount = 1 }; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.None, status.Vote); // verify the non-generic implementation works properly - status = ((IScaleMonitor)_listener).GetScaleStatus(context); + status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.None, status.Vote); } @@ -208,7 +152,7 @@ public void GetScaleStatus_MessagesPerWorkerThresholdExceeded_ReturnsVote_ScaleO }; context.Metrics = queueTriggerMetrics; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.ScaleOut, status.Vote); @@ -226,7 +170,7 @@ public void GetScaleStatus_MessagesPerWorkerThresholdExceeded_ReturnsVote_ScaleO WorkerCount = 1, Metrics = queueTriggerMetrics }; - status = ((IScaleMonitor)_listener).GetScaleStatus(context2); + status = ((IScaleMonitor)_scaleMonitor).GetScaleStatus(context2); Assert.AreEqual(ScaleVote.ScaleOut, status.Vote); } @@ -248,7 +192,7 @@ public void GetScaleStatus_QueueLengthIncreasing_ReturnsVote_ScaleOut() new QueueTriggerMetrics { QueueLength = 150, QueueTime = TimeSpan.FromSeconds(1), Timestamp = timestamp.AddSeconds(15) } }; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.ScaleOut, status.Vote); @@ -276,7 +220,7 @@ public void GetScaleStatus_QueueTimeIncreasing_ReturnsVote_ScaleOut() new QueueTriggerMetrics { QueueLength = 100, QueueTime = TimeSpan.FromSeconds(6), Timestamp = timestamp.AddSeconds(15) } }; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.ScaleOut, status.Vote); @@ -304,7 +248,7 @@ public void GetScaleStatus_QueueLengthDecreasing_ReturnsVote_ScaleIn() new QueueTriggerMetrics { QueueLength = 10, QueueTime = TimeSpan.FromMilliseconds(400), Timestamp = timestamp.AddSeconds(15) } }; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.ScaleIn, status.Vote); @@ -332,7 +276,7 @@ public void GetScaleStatus_QueueTimeDecreasing_ReturnsVote_ScaleIn() new QueueTriggerMetrics { QueueLength = 100, QueueTime = TimeSpan.FromMilliseconds(100), Timestamp = timestamp.AddSeconds(15) } }; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.ScaleIn, status.Vote); @@ -360,7 +304,7 @@ public void GetScaleStatus_QueueSteady_ReturnsVote_None() new QueueTriggerMetrics { QueueLength = 1600, QueueTime = TimeSpan.FromSeconds(1), Timestamp = timestamp.AddSeconds(15) } }; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.None, status.Vote); @@ -388,7 +332,7 @@ public void GetScaleStatus_QueueIdle_ReturnsVote_ScaleOut() new QueueTriggerMetrics { QueueLength = 0, QueueTime = TimeSpan.Zero, Timestamp = timestamp.AddSeconds(15) } }; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.ScaleIn, status.Vote); @@ -411,7 +355,7 @@ public void GetScaleStatus_UnderSampleCountThreshold_ReturnsVote_None() new QueueTriggerMetrics { QueueLength = 10, QueueTime = TimeSpan.FromSeconds(1), Timestamp = DateTime.UtcNow } }; - var status = _listener.GetScaleStatus(context); + var status = _scaleMonitor.GetScaleStatus(context); Assert.AreEqual(ScaleVote.None, status.Vote); } @@ -661,6 +605,24 @@ public async Task ProcessMessageAsync_FunctionInvocationFails() await _listener.ProcessMessageAsync(_queueMessage, TimeSpan.FromMinutes(10), cancellationToken); } + + [Test] + public void Get_TargetScale_IsNotNull() + { + var concurrencyOptions = new ConcurrencyOptions + { + DynamicConcurrencyEnabled = true + }; + var throttleStatus = new ConcurrencyThrottleAggregateStatus { State = ThrottleState.Disabled }; + var optionsWrapper = new OptionsWrapper(concurrencyOptions); + var mockConcurrencyThrottleManager = new Mock(MockBehavior.Strict); + mockConcurrencyThrottleManager.Setup(p => p.GetStatus()).Returns(() => throttleStatus); + var concurrencyManager = new ConcurrencyManager(optionsWrapper, _loggerFactory, mockConcurrencyThrottleManager.Object); + var localListener = new QueueListener(_mockQueue.Object, null, _mockTriggerExecutor.Object, _mockExceptionDispatcher.Object, _loggerFactory, null, _queuesOptions, _mockQueueProcessor.Object, new FunctionDescriptor { Id = TestFunctionId }, concurrencyManager); + + var result = localListener.GetTargetScaler(); + Assert.IsNotNull(result); + } public class TestFixture : IDisposable { private const string TestQueuePrefix = "queuelistenertests"; diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueMetricsProviderTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueMetricsProviderTests.cs new file mode 100644 index 000000000000..8b99608e3c84 --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueMetricsProviderTests.cs @@ -0,0 +1,168 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Threading; +using Azure; +using Azure.Core.TestFramework; +using Azure.Storage.Queues; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Moq; +using NUnit.Framework; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Tests +{ + internal class QueueMetricsProviderTests : LiveTestBase + { + private ILoggerFactory _loggerFactory; + private TestLoggerProvider _loggerProvider; + private QueueMetricsProvider _metricsProvider; + private Mock _mockQueue; + + [SetUp] + public void SetUp() + { + _loggerFactory = new LoggerFactory(); + _loggerProvider = new TestLoggerProvider(); + _loggerFactory.AddProvider(_loggerProvider); + _mockQueue = new Mock(new Uri("https://test.queue.core.windows.net/testqueue"), new QueueClientOptions()); + _metricsProvider = new QueueMetricsProvider(_mockQueue.Object, _loggerFactory); + } + + [OneTimeSetUp] + public void OneTimeSetUp() + { + Fixture = new TestFixture(); + } + + [OneTimeTearDown] + public void OneTimeTearDown() + { + Fixture.Dispose(); + } + + public TestFixture Fixture { get; set; } + + [Test] + public async Task GetMetrics_ReturnsExpectedResult() + { + var metrics = await _metricsProvider.GetMetricsAsync(); + + Assert.AreEqual(0, metrics.QueueLength); + Assert.AreEqual(TimeSpan.Zero, metrics.QueueTime); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); + + // add some test messages + for (int i = 0; i < 5; i++) + { + await Fixture.Queue.SendMessageAsync($"Message {i}"); + } + + await Task.Delay(TimeSpan.FromSeconds(5)); + + metrics = await _metricsProvider.GetMetricsAsync(); + + Assert.AreEqual(5, metrics.QueueLength); + Assert.True(metrics.QueueTime.Ticks > 0); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); + + // verify non-generic interface works as expected + metrics = (QueueTriggerMetrics)(await _metricsProvider.GetMetricsAsync()); + Assert.AreEqual(5, metrics.QueueLength); + } + + [Test] + public async Task GetMetrics_HandlesStorageExceptions() + { + var exception = new RequestFailedException( + 500, + "Things are very wrong.", + default, + new Exception()); + + _mockQueue.Setup(p => p.GetPropertiesAsync(It.IsAny())).Throws(exception); + + var metrics = await _metricsProvider.GetMetricsAsync(); + + Assert.AreEqual(0, metrics.QueueLength); + Assert.AreEqual(TimeSpan.Zero, metrics.QueueTime); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); + + var warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); + Assert.AreEqual("Error querying for queue scale status: Things are very wrong.", warning.FormattedMessage); + } + + public class TestFixture : IDisposable + { + private const string TestQueuePrefix = "queuelistenertests"; + + public TestFixture() + { + // Create a default host to get some default services + IHost host = new HostBuilder() + .ConfigureDefaultTestHost(b => + { + b.AddAzureStorageQueues(); + }) + .Build(); + + var queueServiceClientProvider = host.Services.GetRequiredService(); + QueueClient = queueServiceClientProvider.GetHost(); + + string queueName = string.Format("{0}-{1}", TestQueuePrefix, Guid.NewGuid()); + Queue = QueueClient.GetQueueClient(queueName); + Queue.CreateIfNotExistsAsync().Wait(); + + string poisonQueueName = string.Format("{0}-poison", queueName); + PoisonQueue = QueueClient.GetQueueClient(poisonQueueName); + PoisonQueue.CreateIfNotExistsAsync().Wait(); + } + + public QueueClient Queue + { + get; + private set; + } + + public QueueClient PoisonQueue + { + get; + private set; + } + + public QueueServiceClient QueueClient + { + get; + private set; + } + + public QueueClient CreateNewQueue() + { + string queueName = string.Format("{0}-{1}", TestQueuePrefix, Guid.NewGuid()); + var queue = QueueClient.GetQueueClient(queueName); + queue.CreateIfNotExistsAsync().Wait(); + return queue; + } + + public void Dispose() + { + var result = QueueClient.GetQueues(prefix: TestQueuePrefix); + var tasks = new List(); + + foreach (var queue in result) + { + tasks.Add(QueueClient.GetQueueClient(queue.Name).DeleteAsync()); + } + + Task.WaitAll(tasks.ToArray()); + } + } + } +} diff --git a/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueTargetScalerTests.cs b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueTargetScalerTests.cs new file mode 100644 index 000000000000..410be2e6e749 --- /dev/null +++ b/sdk/storage/Microsoft.Azure.WebJobs.Extensions.Storage.Queues/tests/QueueTargetScalerTests.cs @@ -0,0 +1,54 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +using Azure.Storage.Queues; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners; +using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests; +using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; +using Moq; +using NUnit.Framework; + +namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Tests +{ + public class QueueTargetScalerTests + { + private ILoggerFactory _loggerFactory; + private TestLoggerProvider _loggerProvider; + + [SetUp] + public void Setup() + { + _loggerFactory = new LoggerFactory(); + _loggerProvider = new TestLoggerProvider(); + _loggerFactory.AddProvider(_loggerProvider); + } + + [Test] + [TestCase(160, null, 10)] + [TestCase(150, null, 10)] + [TestCase(144, null, 9)] + [TestCase(160, 20, 8)] + public void QueueTargetScaler_Returns_Expected(int queueLength, int? concurrency, int expectedTargetWorkerCount) + { + QueuesOptions options = new QueuesOptions { BatchSize = 16 }; + + TargetScalerContext context = new TargetScalerContext + { + InstanceConcurrency = concurrency + }; + + _loggerFactory = new LoggerFactory(); + _loggerProvider = new TestLoggerProvider(); + _loggerFactory.AddProvider(_loggerProvider); + + Mock mockQueueClient = new Mock(); + mockQueueClient.Setup(q => q.Name).Returns("testQueue"); + + QueueTargetScaler targetScaler = new QueueTargetScaler("testFunctionId", mockQueueClient.Object, options, _loggerFactory); + TargetScalerResult result = targetScaler.GetScaleResultInternal(context, queueLength); + Assert.AreEqual(expectedTargetWorkerCount, result.TargetWorkerCount); + } + } +}