diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs index c0ae6c87bc73..8c94873d354a 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs @@ -9,8 +9,10 @@ using System.Threading.Tasks; using Azure.Core.Pipeline; using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; using Azure.Messaging.ServiceBus.Diagnostics; using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Scale; @@ -18,7 +20,7 @@ namespace Microsoft.Azure.WebJobs.ServiceBus.Listeners { - internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider + internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider, ITargetScalerProvider { private readonly ITriggeredFunctionExecutor _triggerExecutor; private readonly string _entityPath; @@ -34,6 +36,8 @@ internal sealed class ServiceBusListener : IListener, IScaleMonitorProvider private readonly Lazy _client; private readonly Lazy _sessionMessageProcessor; private readonly Lazy _scaleMonitor; + private readonly Lazy _targetScaler; + private readonly Lazy _administrationClient; private readonly ConcurrencyUpdateManager _concurrencyUpdateManager; // internal for testing @@ -72,8 +76,7 @@ public ServiceBusListener( _functionId = functionId; _client = new Lazy( - () => - clientFactory.CreateClientFromSetting(connection)); + () => clientFactory.CreateClientFromSetting(connection)); _batchReceiver = new Lazy( () => messagingProvider.CreateBatchMessageReceiver( @@ -95,15 +98,31 @@ public ServiceBusListener( return messagingProvider.CreateSessionMessageProcessor(_client.Value,_entityPath, sessionProcessorOptions); }); + _administrationClient = new Lazy( + () => clientFactory.CreateAdministrationClient(connection)); + _scaleMonitor = new Lazy( () => new ServiceBusScaleMonitor( functionId, + _entityPath, entityType, + _batchReceiver, + _administrationClient, + loggerFactory + )); + + _targetScaler = new Lazy( + () => new ServiceBusTargetScaler( + functionId, _entityPath, - connection, + entityType, _batchReceiver, - loggerFactory, - clientFactory)); + _administrationClient, + options, + _isSessionsEnabled, + _singleDispatch, + loggerFactory + )); _scopeFactory = new Lazy( () => new EntityScopeFactory(_batchReceiver.Value.EntityPath, _batchReceiver.Value.FullyQualifiedNamespace)); @@ -536,6 +555,11 @@ public IScaleMonitor GetMonitor() return _scaleMonitor.Value; } + public ITargetScaler GetTargetScaler() + { + return _targetScaler.Value; + } + /// /// Responsible for handling dynamic concurrency concurrency updates for message processors. /// diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusMetricsProvider.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusMetricsProvider.cs new file mode 100644 index 000000000000..41509a2c2a49 --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusMetricsProvider.cs @@ -0,0 +1,239 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.ServiceBus.Listeners; +using Microsoft.Azure.WebJobs.ServiceBus; +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; + +namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners +{ + internal class ServiceBusMetricsProvider + { + private const string DeadLetterQueuePath = @"/$DeadLetterQueue"; + + private readonly ILogger _logger; + private readonly string _entityPath; + private readonly ServiceBusEntityType _serviceBusEntityType; + private readonly Lazy _receiver; + private readonly bool _isListeningOnDeadLetterQueue; + private readonly Lazy _administrationClient; + + private DateTime _nextWarningTime; + + public ServiceBusMetricsProvider( + string entityPath, + ServiceBusEntityType serviceBusEntityType, + Lazy receiver, + Lazy administrationClient, + ILoggerFactory loggerFactory) + { + _serviceBusEntityType = serviceBusEntityType; + _receiver = receiver; + _entityPath = entityPath; + _isListeningOnDeadLetterQueue = entityPath.EndsWith(DeadLetterQueuePath, StringComparison.OrdinalIgnoreCase); + _administrationClient = administrationClient; + _logger = loggerFactory.CreateLogger(); + _nextWarningTime = DateTime.UtcNow; + } + + public async Task GetMessageCountAsync() + { + long activeMessageCount = 0; + long deadLetterCount = 0; + string entityName = _serviceBusEntityType == ServiceBusEntityType.Queue ? "queue" : "topic"; + try + { + if (_serviceBusEntityType == ServiceBusEntityType.Queue) + { + QueueRuntimeProperties queueRuntimeProperties = await _administrationClient.Value.GetQueueRuntimePropertiesAsync(_entityPath).ConfigureAwait(false); + activeMessageCount = queueRuntimeProperties.ActiveMessageCount; + deadLetterCount = queueRuntimeProperties.DeadLetterMessageCount; + } + else + { + ServiceBusEntityPathHelper.ParseTopicAndSubscription(_entityPath, out string topicPath, out string subscriptionPath); + + SubscriptionRuntimeProperties subscriptionProperties = await _administrationClient.Value.GetSubscriptionRuntimePropertiesAsync(topicPath, subscriptionPath).ConfigureAwait(false); + activeMessageCount = subscriptionProperties.ActiveMessageCount; + deadLetterCount = subscriptionProperties.DeadLetterMessageCount; + } + } + catch (ServiceBusException ex) + when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound) + { + _logger.LogWarning($"ServiceBus {entityName} '{_entityPath}' was not found."); + } + catch (UnauthorizedAccessException ex) + { + if (TimeToLogWarning()) + { + _logger.LogWarning($"Connection string does not have 'Manage Claim' for {entityName} '{_entityPath}'. Unable to determine active message count.", ex); + } + throw ex; + } + catch (Exception e) + { + _logger.LogWarning($"Error querying for Service Bus {entityName} scale status: {e.Message}"); + } + + long totalNewMessageCount = 0; + if ((!_isListeningOnDeadLetterQueue && activeMessageCount > 0) || (_isListeningOnDeadLetterQueue && deadLetterCount > 0)) + { + totalNewMessageCount = _isListeningOnDeadLetterQueue ? deadLetterCount : activeMessageCount; + } + + return totalNewMessageCount; + } + + public async Task GetMetricsAsync() + { + ServiceBusReceivedMessage activeMessage = null; + string entityName = _serviceBusEntityType == ServiceBusEntityType.Queue ? "queue" : "topic"; + + try + { + // Do a first attempt to peek one message from the head of the queue + var peekedMessage = await _receiver.Value.PeekMessageAsync(fromSequenceNumber: 0).ConfigureAwait(false); + if (peekedMessage == null) + { + // ignore it. The Get[Queue|Topic]MetricsAsync methods deal with activeMessage being null + } + else if (peekedMessage.State == ServiceBusMessageState.Active) + { + activeMessage = peekedMessage; + } + else + { + // Do another attempt to peek ten message from last peek sequence number + var peekedMessages = await _receiver.Value.PeekMessagesAsync(10, fromSequenceNumber: peekedMessage.SequenceNumber).ConfigureAwait(false); + foreach (var receivedMessage in peekedMessages) + { + if (receivedMessage.State == ServiceBusMessageState.Active) + { + activeMessage = receivedMessage; + break; + } + } + + // Batch contains messages but none are active in the peeked batch + if (peekedMessages.Count > 0 && activeMessage == null) + { + _logger.LogDebug("{_serviceBusEntityType} {_entityPath} contains multiple messages but none are active in the peeked batch."); + } + } + + if (_serviceBusEntityType == ServiceBusEntityType.Queue) + { + return await GetQueueMetricsAsync(activeMessage).ConfigureAwait(false); + } + else + { + return await GetTopicMetricsAsync(activeMessage).ConfigureAwait(false); + } + } + catch (ServiceBusException ex) + when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound) + { + _logger.LogWarning($"ServiceBus {entityName} '{_entityPath}' was not found."); + } + catch (UnauthorizedAccessException) // When manage claim is not used on Service Bus connection string + { + if (TimeToLogWarning()) + { + _logger.LogWarning($"Connection string does not have Manage claim for {entityName} '{_entityPath}'. Failed to get {entityName} description to " + + $"derive {entityName} length metrics. Falling back to using first message enqueued time."); + } + } + catch (Exception e) + { + _logger.LogWarning($"Error querying for Service Bus {entityName} scale status: {e.Message}"); + } + + // Path for connection strings with no manage claim + return CreateTriggerMetrics(activeMessage, 0, 0, 0, _isListeningOnDeadLetterQueue); + } + + private async Task GetQueueMetricsAsync(ServiceBusReceivedMessage message) + { + QueueRuntimeProperties queueRuntimeProperties; + QueueProperties queueProperties; + long activeMessageCount = 0; + long deadLetterCount = 0; + int partitionCount = 0; + + queueRuntimeProperties = await _administrationClient.Value.GetQueueRuntimePropertiesAsync(_entityPath).ConfigureAwait(false); + activeMessageCount = queueRuntimeProperties.ActiveMessageCount; + deadLetterCount = queueRuntimeProperties.DeadLetterMessageCount; + + // If partitioning is turned on, then Service Bus automatically partitions queues into 16 partitions + // See more information here: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning#standard + queueProperties = await _administrationClient.Value.GetQueueAsync(_entityPath).ConfigureAwait(false); + partitionCount = queueProperties.EnablePartitioning ? 16 : 0; + + return CreateTriggerMetrics(message, activeMessageCount, deadLetterCount, partitionCount, _isListeningOnDeadLetterQueue); + } + + private async Task GetTopicMetricsAsync(ServiceBusReceivedMessage message) + { + TopicProperties topicProperties; + SubscriptionRuntimeProperties subscriptionProperties; + string topicPath, subscriptionPath; + long activeMessageCount = 0; + long deadLetterCount = 0; + int partitionCount = 0; + + ServiceBusEntityPathHelper.ParseTopicAndSubscription(_entityPath, out topicPath, out subscriptionPath); + + subscriptionProperties = await _administrationClient.Value.GetSubscriptionRuntimePropertiesAsync(topicPath, subscriptionPath).ConfigureAwait(false); + activeMessageCount = subscriptionProperties.ActiveMessageCount; + deadLetterCount = subscriptionProperties.DeadLetterMessageCount; + + // If partitioning is turned on, then Service Bus automatically partitions queues into 16 partitions + // See more information here: https://docs.microsoft.com/azure/service-bus-messaging/service-bus-partitioning#standard + topicProperties = await _administrationClient.Value.GetTopicAsync(topicPath).ConfigureAwait(false); + partitionCount = topicProperties.EnablePartitioning ? 16 : 0; + + return CreateTriggerMetrics(message, activeMessageCount, deadLetterCount, partitionCount, _isListeningOnDeadLetterQueue); + } + + private bool TimeToLogWarning() + { + DateTime currentTime = DateTime.UtcNow; + bool timeToLog = currentTime >= _nextWarningTime; + if (timeToLog) + { + _nextWarningTime = currentTime.AddHours(1); + } + return timeToLog; + } + + internal static ServiceBusTriggerMetrics CreateTriggerMetrics(ServiceBusReceivedMessage message, long activeMessageCount, long deadLetterCount, int partitionCount, bool isListeningOnDeadLetterQueue) + { + long totalNewMessageCount = 0; + TimeSpan queueTime = TimeSpan.Zero; + + if (message != null) + { + queueTime = DateTimeOffset.UtcNow.Subtract(message.EnqueuedTime); + totalNewMessageCount = 1; // There's at least one if message != null. Default for connection string with no manage claim + } + + if ((!isListeningOnDeadLetterQueue && activeMessageCount > 0) || (isListeningOnDeadLetterQueue && deadLetterCount > 0)) + { + totalNewMessageCount = isListeningOnDeadLetterQueue ? deadLetterCount : activeMessageCount; + } + + return new ServiceBusTriggerMetrics + { + MessageCount = totalNewMessageCount, + PartitionCount = partitionCount, + QueueTime = queueTime + }; + } + } +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs index 692b75ce5327..bbccb970c43b 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusScaleMonitor.cs @@ -11,35 +11,32 @@ using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners; namespace Microsoft.Azure.WebJobs.ServiceBus.Listeners { internal class ServiceBusScaleMonitor : IScaleMonitor { - private const string DeadLetterQueuePath = @"/$DeadLetterQueue"; - private readonly string _functionId; - private readonly ServiceBusEntityType _serviceBusEntityType; private readonly string _entityPath; private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; - private readonly bool _isListeningOnDeadLetterQueue; - private readonly Lazy _receiver; - private readonly Lazy _administrationClient; private readonly ILogger _logger; - - private DateTime _nextWarningTime; - - public ServiceBusScaleMonitor(string functionId, ServiceBusEntityType serviceBusEntityType, string entityPath, string connection, Lazy receiver, ILoggerFactory loggerFactory, ServiceBusClientFactory clientFactory) + private readonly ServiceBusMetricsProvider _serviceBusMetricsProvider; + + public ServiceBusScaleMonitor( + string functionId, + string entityPath, + ServiceBusEntityType entityType, + Lazy receiver, + Lazy administrationClient, + ILoggerFactory loggerFactory + ) { _functionId = functionId; - _serviceBusEntityType = serviceBusEntityType; _entityPath = entityPath; - _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-ServiceBusTrigger-{_entityPath}".ToLower(CultureInfo.InvariantCulture), _functionId); - _isListeningOnDeadLetterQueue = entityPath.EndsWith(DeadLetterQueuePath, StringComparison.OrdinalIgnoreCase); - _receiver = receiver; - _administrationClient = new Lazy(() => clientFactory.CreateAdministrationClient(connection)); + _serviceBusMetricsProvider = new ServiceBusMetricsProvider(entityPath, entityType, receiver, administrationClient, loggerFactory); + _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-ServiceBusTrigger-{_entityPath}".ToLower(CultureInfo.InvariantCulture), functionId); _logger = loggerFactory.CreateLogger(); - _nextWarningTime = DateTime.UtcNow; } public ScaleMonitorDescriptor Descriptor @@ -57,140 +54,7 @@ async Task IScaleMonitor.GetMetricsAsync() public async Task GetMetricsAsync() { - ServiceBusReceivedMessage activeMessage = null; - string entityName = _serviceBusEntityType == ServiceBusEntityType.Queue ? "queue" : "topic"; - - try - { - // Do a first attempt to peek one message from the head of the queue - var peekedMessage = await _receiver.Value.PeekMessageAsync(fromSequenceNumber: 0).ConfigureAwait(false); - if (peekedMessage == null) - { - // ignore it. The Get[Queue|Topic]MetricsAsync methods deal with activeMessage being null - } - else if (MessageIsActive(peekedMessage)) - { - activeMessage = peekedMessage; - } - else - { - // Do another attempt to peek ten message from last peek sequence number - var peekedMessages = await _receiver.Value.PeekMessagesAsync(10, fromSequenceNumber: peekedMessage.SequenceNumber).ConfigureAwait(false); - foreach (var receivedMessage in peekedMessages ) - { - if (MessageIsActive(receivedMessage)) - { - activeMessage = receivedMessage; - break; - } - } - - // Batch contains messages but none are active in the peeked batch - if (peekedMessages.Count > 0 && activeMessage == null) - { - _logger.LogDebug("{_serviceBusEntityType} {_entityPath} contains multiple messages but none are active in the peeked batch."); - } - } - - if (_serviceBusEntityType == ServiceBusEntityType.Queue) - { - return await GetQueueMetricsAsync(activeMessage).ConfigureAwait(false); - } - else - { - return await GetTopicMetricsAsync(activeMessage).ConfigureAwait(false); - } - } - catch (ServiceBusException ex) - when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound) - { - _logger.LogWarning($"ServiceBus {entityName} '{_entityPath}' was not found."); - } - catch (UnauthorizedAccessException) // When manage claim is not used on Service Bus connection string - { - if (TimeToLogWarning()) - { - _logger.LogWarning($"Connection string does not have Manage claim for {entityName} '{_entityPath}'. Failed to get {entityName} description to " + - $"derive {entityName} length metrics. Falling back to using first message enqueued time."); - } - } - catch (Exception e) - { - _logger.LogWarning($"Error querying for Service Bus {entityName} scale status: {e.Message}"); - } - - // Path for connection strings with no manage claim - return CreateTriggerMetrics(activeMessage, 0, 0, 0, _isListeningOnDeadLetterQueue); - } - - private async Task GetQueueMetricsAsync(ServiceBusReceivedMessage message) - { - QueueRuntimeProperties queueRuntimeProperties; - QueueProperties queueProperties; - long activeMessageCount = 0, deadLetterCount = 0; - int partitionCount = 0; - - queueRuntimeProperties = await _administrationClient.Value.GetQueueRuntimePropertiesAsync(_entityPath).ConfigureAwait(false); - activeMessageCount = queueRuntimeProperties.ActiveMessageCount; - deadLetterCount = queueRuntimeProperties.DeadLetterMessageCount; - - // If partitioning is turned on, then Service Bus automatically partitions queues into 16 partitions - // See more information here: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning#standard - queueProperties = await _administrationClient.Value.GetQueueAsync(_entityPath).ConfigureAwait(false); - partitionCount = queueProperties.EnablePartitioning ? 16 : 0; - - return CreateTriggerMetrics(message, activeMessageCount, deadLetterCount, partitionCount, _isListeningOnDeadLetterQueue); - } - - private async Task GetTopicMetricsAsync(ServiceBusReceivedMessage message) - { - TopicProperties topicProperties; - SubscriptionRuntimeProperties subscriptionProperties; - string topicPath, subscriptionPath; - long activeMessageCount = 0, deadLetterCount = 0; - int partitionCount = 0; - - ServiceBusEntityPathHelper.ParseTopicAndSubscription(_entityPath, out topicPath, out subscriptionPath); - - subscriptionProperties = await _administrationClient.Value.GetSubscriptionRuntimePropertiesAsync(topicPath, subscriptionPath).ConfigureAwait(false); - activeMessageCount = subscriptionProperties.ActiveMessageCount; - deadLetterCount = subscriptionProperties.DeadLetterMessageCount; - - // If partitioning is turned on, then Service Bus automatically partitions queues into 16 partitions - // See more information here: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-partitioning#standard - topicProperties = await _administrationClient.Value.GetTopicAsync(topicPath).ConfigureAwait(false); - partitionCount = topicProperties.EnablePartitioning ? 16 : 0; - - return CreateTriggerMetrics(message, activeMessageCount, deadLetterCount, partitionCount, _isListeningOnDeadLetterQueue); - } - - internal static ServiceBusTriggerMetrics CreateTriggerMetrics(ServiceBusReceivedMessage message, long activeMessageCount, long deadLetterCount, int partitionCount, bool isListeningOnDeadLetterQueue) - { - long totalNewMessageCount = 0; - TimeSpan queueTime = TimeSpan.Zero; - - if (message != null) - { - queueTime = DateTimeOffset.UtcNow.Subtract(message.EnqueuedTime); - totalNewMessageCount = 1; // There's at least one if message != null. Default for connection string with no manage claim - } - - if ((!isListeningOnDeadLetterQueue && activeMessageCount > 0) || (isListeningOnDeadLetterQueue && deadLetterCount > 0)) - { - totalNewMessageCount = isListeningOnDeadLetterQueue ? deadLetterCount : activeMessageCount; - } - - return new ServiceBusTriggerMetrics - { - MessageCount = totalNewMessageCount, - PartitionCount = partitionCount, - QueueTime = queueTime - }; - } - - private static bool MessageIsActive(ServiceBusReceivedMessage message) - { - return message.State == ServiceBusMessageState.Active; + return await _serviceBusMetricsProvider.GetMetricsAsync().ConfigureAwait(false); } ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context) @@ -316,17 +180,6 @@ private ScaleStatus GetScaleStatusCore(int workerCount, ServiceBusTriggerMetrics return status; } - private bool TimeToLogWarning() - { - DateTime currentTime = DateTime.UtcNow; - bool timeToLog = currentTime >= _nextWarningTime; - if (timeToLog) - { - _nextWarningTime = currentTime.AddHours(1); - } - return timeToLog; - } - private static bool IsTrueForLastN(IList samples, int count, Func predicate) { // Walks through the list from left to right starting at len(samples) - count. @@ -341,4 +194,4 @@ private static bool IsTrueForLastN(IList samples, int return true; } } -} +} \ No newline at end of file diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusTargetScaler.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusTargetScaler.cs new file mode 100644 index 000000000000..eda2f1092df9 --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusTargetScaler.cs @@ -0,0 +1,106 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.ServiceBus; +using Microsoft.Azure.WebJobs.ServiceBus.Listeners; +using Microsoft.Extensions.Logging; +using System; +using System.Globalization; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners +{ + internal class ServiceBusTargetScaler : ITargetScaler + { + private readonly string _functionId; + private readonly ServiceBusMetricsProvider _serviceBusMetricsProvider; + private readonly ServiceBusOptions _options; + private readonly bool _isSessionsEnabled; + private readonly bool _singleDispatch; + private readonly TargetScalerDescriptor _targetScalerDescriptor; + private readonly string _entityPath; + private readonly ILogger _logger; + + public ServiceBusTargetScaler( + string functionId, + string entityPath, + ServiceBusEntityType entityType, + Lazy receiver, + Lazy administrationClient, + ServiceBusOptions options, + bool isSessionsEnabled, + bool singleDispatch, + ILoggerFactory loggerFactory + ) + { + _functionId = functionId; + _serviceBusMetricsProvider = new ServiceBusMetricsProvider(entityPath, entityType, receiver, administrationClient, loggerFactory); + _entityPath = entityPath; + _targetScalerDescriptor = new TargetScalerDescriptor(functionId); + _logger = loggerFactory.CreateLogger(); + _options = options; + _singleDispatch = singleDispatch; + _isSessionsEnabled = isSessionsEnabled; + } + + public TargetScalerDescriptor TargetScalerDescriptor => _targetScalerDescriptor; + + public async Task GetScaleResultAsync(TargetScalerContext context) + { + try + { + long activeMessageCount = await _serviceBusMetricsProvider.GetMessageCountAsync().ConfigureAwait(false); + return GetScaleResultInternal(context, activeMessageCount); + } + catch (UnauthorizedAccessException ex) + { + throw new NotSupportedException("Target scaler is not supported.", ex); + } + } + + internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, long messageCount) + { + int concurrency; + + if (!context.InstanceConcurrency.HasValue) + { + if (!_singleDispatch) + { + concurrency = _options.MaxMessageBatchSize; + } + else + { + if (_isSessionsEnabled) + { + concurrency = _options.MaxConcurrentSessions; + } + else + { + concurrency = _options.MaxConcurrentCalls; + } + } + } + else + { + concurrency = context.InstanceConcurrency.Value; + } + + if (concurrency < 1) + { + throw new ArgumentOutOfRangeException($"Unexpected concurrency='{concurrency}' - the value must be > 0."); + } + + int targetWorkerCount = (int)Math.Ceiling(messageCount / (decimal)concurrency); + _logger.LogInformation($"Target worker count for function '{_functionId}' is '{targetWorkerCount}' (EntityPath='{_entityPath}', MessageCount ='{messageCount}', Concurrency='{concurrency}')."); + + return new TargetScalerResult + { + TargetWorkerCount = targetWorkerCount + }; + } + } +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj index 2dfcc3a79b85..25062096cf33 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Microsoft.Azure.WebJobs.Extensions.ServiceBus.csproj @@ -1,4 +1,4 @@ - + netstandard2.0 diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusMetricsProviderTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusMetricsProviderTests.cs new file mode 100644 index 000000000000..014c23b21d4f --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusMetricsProviderTests.cs @@ -0,0 +1,53 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Azure.Messaging.ServiceBus; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners; +using NUnit.Framework; +using System; + +namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests.Listeners +{ + internal class ServiceBusMetricsProviderTests + { + [Test] + public void GetMetrics_ReturnsExpectedResult() + { + var utcNow = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(10)); + + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(enqueuedTime: utcNow); + + // Test base case + var metrics = ServiceBusMetricsProvider.CreateTriggerMetrics(null, 0, 0, 0, false); + + Assert.AreEqual(0, metrics.PartitionCount); + Assert.AreEqual(0, metrics.MessageCount); + Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); + + // Test messages on main queue + metrics = ServiceBusMetricsProvider.CreateTriggerMetrics(message, 10, 0, 0, false); + + Assert.AreEqual(0, metrics.PartitionCount); + Assert.AreEqual(10, metrics.MessageCount); + Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(10))); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); + + // Test listening on dead letter queue + metrics = ServiceBusMetricsProvider.CreateTriggerMetrics(message, 10, 100, 0, true); + + Assert.AreEqual(0, metrics.PartitionCount); + Assert.AreEqual(100, metrics.MessageCount); + Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(10))); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); + + // Test partitions + metrics = ServiceBusMetricsProvider.CreateTriggerMetrics(null, 0, 0, 16, false); + + Assert.AreEqual(16, metrics.PartitionCount); + Assert.AreEqual(0, metrics.MessageCount); + Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime); + Assert.AreNotEqual(default(DateTime), metrics.Timestamp); + } + } +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs index 72f5cb6e7d54..b335d819672c 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusScaleMonitorTests.cs @@ -9,6 +9,7 @@ using Azure.Messaging.ServiceBus; using Azure.Messaging.ServiceBus.Administration; using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Config; +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Azure.WebJobs.Host.TestCommon; @@ -114,46 +115,6 @@ public void ScaleMonitorDescriptor_ReturnsExpectedValue() Assert.AreEqual($"{_functionId}-ServiceBusTrigger-{_entityPath}".ToLower(), _scaleMonitor.Descriptor.Id); } - [Test] - public void GetMetrics_ReturnsExpectedResult() - { - var utcNow = DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(10)); - - var message = ServiceBusModelFactory.ServiceBusReceivedMessage(enqueuedTime: utcNow); - - // Test base case - var metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(null, 0, 0, 0, false); - - Assert.AreEqual(0, metrics.PartitionCount); - Assert.AreEqual(0, metrics.MessageCount); - Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - // Test messages on main queue - metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(message, 10, 0, 0, false); - - Assert.AreEqual(0, metrics.PartitionCount); - Assert.AreEqual(10, metrics.MessageCount); - Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(10))); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - // Test listening on dead letter queue - metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(message, 10, 100, 0, true); - - Assert.AreEqual(0, metrics.PartitionCount); - Assert.AreEqual(100, metrics.MessageCount); - Assert.That(metrics.QueueTime, Is.GreaterThanOrEqualTo(TimeSpan.FromSeconds(10))); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - - // Test partitions - metrics = ServiceBusScaleMonitor.CreateTriggerMetrics(null, 0, 0, 16, false); - - Assert.AreEqual(16, metrics.PartitionCount); - Assert.AreEqual(0, metrics.MessageCount); - Assert.AreEqual(TimeSpan.FromSeconds(0), metrics.QueueTime); - Assert.AreNotEqual(default(DateTime), metrics.Timestamp); - } - [Test] public async Task GetMetrics_IgnoresScheduledMessages() { diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusTargetScalerTests.cs b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusTargetScalerTests.cs new file mode 100644 index 000000000000..567d5a5e975a --- /dev/null +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Listeners/ServiceBusTargetScalerTests.cs @@ -0,0 +1,64 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.Extensions.ServiceBus.Listeners; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Azure.WebJobs.Host.TestCommon; +using Microsoft.Azure.WebJobs.ServiceBus; +using Microsoft.Extensions.Logging; +using NUnit.Framework; + +namespace Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests.Listeners +{ + public class ServiceBusTargetScalerTests + { + private ILoggerFactory _loggerFactory; + private TestLoggerProvider _loggerProvider; + + [SetUp] + public void Setup() + { + _loggerFactory = new LoggerFactory(); + _loggerProvider = new TestLoggerProvider(); + _loggerFactory.AddProvider(_loggerProvider); + } + + [TestCase(100, false, true, null, 6)] + [TestCase(100, true, true, null, 4)] + [TestCase(100, false, true, 19, 6)] + [TestCase(100, false, false, null, 3)] + public void ServiceBusTargetScaler_Returns_Expected(int messageCount, bool isSessionEnabled, bool singleDispatch, int? concurrency,int expected) + { + ServiceBusOptions options = new ServiceBusOptions + { + MaxConcurrentCalls = 19, + MaxConcurrentSessions = 29, + MaxMessageBatchSize = 39 + }; + + TargetScalerContext context = new TargetScalerContext + { + InstanceConcurrency = concurrency + }; + + _loggerFactory = new LoggerFactory(); + _loggerProvider = new TestLoggerProvider(); + _loggerFactory.AddProvider(_loggerProvider); + + ServiceBusTargetScaler targetScaler = new ServiceBusTargetScaler( + "functionId", + "entityPath", + ServiceBusEntityType.Queue, + null, + null, + options, + isSessionEnabled, + singleDispatch, + _loggerFactory + ); + TargetScalerResult result = targetScaler.GetScaleResultInternal(context, messageCount); + + Assert.AreEqual(result.TargetWorkerCount, expected); + } + } +} diff --git a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests.csproj b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests.csproj index f25fcfed268b..99bb3568a7b2 100644 --- a/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests.csproj +++ b/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/tests/Microsoft.Azure.WebJobs.Extensions.ServiceBus.Tests.csproj @@ -32,4 +32,4 @@ - \ No newline at end of file + \ No newline at end of file