From 977a4db4c59f3ab2ddebef9c5c98f39375845a0c Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 1 Nov 2022 10:45:31 -0700 Subject: [PATCH 01/28] separate out scale monitor code from cosmosdbtriggerlistener --- src/ExtensionsSample/ExtensionsSample.csproj | 2 +- .../Trigger/CosmosDBScaleMonitor.cs | 268 ++++++++++++++++++ .../Trigger/CosmosDBTriggerMetrics.cs | 2 +- 3 files changed, 270 insertions(+), 2 deletions(-) create mode 100644 src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs diff --git a/src/ExtensionsSample/ExtensionsSample.csproj b/src/ExtensionsSample/ExtensionsSample.csproj index b1831bdc5..3cb94854b 100644 --- a/src/ExtensionsSample/ExtensionsSample.csproj +++ b/src/ExtensionsSample/ExtensionsSample.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp2.1 + netcoreapp3.1 latest $(NoWarn);8002 diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs new file mode 100644 index 000000000..d9eb8f37c --- /dev/null +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -0,0 +1,268 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger +{ + public class CosmosDBScaleMonitor : IScaleMonitor + { + private readonly ILogger _logger; + private readonly string _functionId; + private readonly Container _monitoredContainer; + private readonly Container _leaseContainer; + private readonly string _processorName; + private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; + + private static readonly Dictionary KnownDocumentClientErrors = new Dictionary() + { + { "Resource Not Found", "Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files." }, + { "The input authorization token can't serve the request", string.Empty }, + { "The MAC signature found in the HTTP request is not the same", string.Empty }, + { "Service is currently unavailable.", string.Empty }, + { "Entity with the specified id does not exist in the system.", string.Empty }, + { "Subscription owning the database account is disabled.", string.Empty }, + { "Request rate is large", string.Empty }, + { "PartitionKey value must be supplied for this operation.", "We do not support lease containers with partitions at this time. Please create a new lease collection without partitions." }, + { "The remote name could not be resolved:", string.Empty }, + { "Owner resource does not exist", string.Empty }, + { "The specified document collection is invalid", string.Empty } + }; + + public CosmosDBScaleMonitor(string functionId, ILoggerFactory loggerFactory, Container monitoredContainer, Container leaseContainer, string processorName) + { + _logger = loggerFactory.CreateLogger(); + _functionId = functionId; + _monitoredContainer = monitoredContainer; + _leaseContainer = leaseContainer; + _processorName = processorName; + _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower()); + } + + public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor; + + public async Task GetMetricsAsync() + { + int partitionCount = 0; + long remainingWork = 0; + + try + { + List partitionWorkList = new List(); + ChangeFeedEstimator estimator = _monitoredContainer.GetChangeFeedEstimator(_processorName, _leaseContainer); + using (FeedIterator iterator = estimator.GetCurrentStateIterator()) + { + while (iterator.HasMoreResults) + { + FeedResponse response = await iterator.ReadNextAsync(); + partitionWorkList.AddRange(response); + } + } + + partitionCount = partitionWorkList.Count; + remainingWork = partitionWorkList.Sum(item => item.EstimatedLag); + } + catch (Exception e) when (e is CosmosException || e is InvalidOperationException) + { + if (!TryHandleCosmosException(e)) + { + _logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message); + if (e is InvalidOperationException) + { + throw; + } + } + } + catch (System.Net.Http.HttpRequestException e) + { + string errormsg; + + var webException = e.InnerException as WebException; + if (webException != null && + webException.Status == WebExceptionStatus.ProtocolError) + { + string statusCode = ((HttpWebResponse)webException.Response).StatusCode.ToString(); + string statusDesc = ((HttpWebResponse)webException.Response).StatusDescription; + errormsg = string.Format("CosmosDBTrigger status {0}: {1}.", statusCode, statusDesc); + } + else if (webException != null && + webException.Status == WebExceptionStatus.NameResolutionFailure) + { + errormsg = string.Format("CosmosDBTrigger Exception message: {0}.", webException.Message); + } + else + { + errormsg = e.ToString(); + } + + _logger.LogWarning(Events.OnScaling, errormsg); + } + + return new CosmosDBTriggerMetrics + { + Timestamp = DateTime.UtcNow, + PartitionCount = partitionCount, + RemainingWork = remainingWork + }; + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); + } + + async Task IScaleMonitor.GetMetricsAsync() + { + return await GetMetricsAsync(); + } + + public ScaleStatus GetScaleStatus(ScaleStatusContext context) + { + return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); + } + + private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] metrics) + { + ScaleStatus status = new ScaleStatus + { + Vote = ScaleVote.None + }; + + const int NumberOfSamplesToConsider = 5; + + // Unable to determine the correct vote with no metrics. + if (metrics == null) + { + return status; + } + + // We shouldn't assign more workers than there are partitions (Cosmos DB, Event Hub, Service Bus Queue/Topic) + // This check is first, because it is independent of load or number of samples. + int partitionCount = metrics.Length > 0 ? metrics.Last().PartitionCount : 0; + if (partitionCount > 0 && partitionCount < workerCount) + { + status.Vote = ScaleVote.ScaleIn; + _logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount}).")); + _logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " + + $"of partitions for container ({_monitoredContainer.Id}, {partitionCount}).")); + return status; + } + + // At least 5 samples are required to make a scale decision for the rest of the checks. + if (metrics.Length < NumberOfSamplesToConsider) + { + return status; + } + + // Maintain a minimum ratio of 1 worker per 1,000 items of remaining work. + long latestRemainingWork = metrics.Last().RemainingWork; + if (latestRemainingWork > workerCount * 1000) + { + status.Vote = ScaleVote.ScaleOut; + _logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000.")); + _logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({_monitoredContainer.Id}, {latestRemainingWork}) " + + $"is too high relative to the number of instances ({workerCount}).")); + return status; + } + + bool documentsWaiting = metrics.All(m => m.RemainingWork > 0); + if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount) + { + status.Vote = ScaleVote.ScaleOut; + _logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{_monitoredContainer.Id}' has documents waiting to be processed.")); + _logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions.")); + return status; + } + + // Check to see if the trigger source has been empty for a while. Only if all trigger sources are empty do we scale down. + bool isIdle = metrics.All(m => m.RemainingWork == 0); + if (isIdle) + { + status.Vote = ScaleVote.ScaleIn; + _logger.LogInformation(Events.OnScaling, string.Format($"'{_monitoredContainer.Id}' is idle.")); + return status; + } + + // Samples are in chronological order. Check for a continuous increase in work remaining. + // If detected, this results in an automatic scale out for the site container. + bool remainingWorkIncreasing = + IsTrueForLast( + metrics, + NumberOfSamplesToConsider, + (prev, next) => prev.RemainingWork < next.RemainingWork) && metrics[0].RemainingWork > 0; + if (remainingWorkIncreasing) + { + status.Vote = ScaleVote.ScaleOut; + _logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{_monitoredContainer.Id}'."); + return status; + } + + bool remainingWorkDecreasing = + IsTrueForLast( + metrics, + NumberOfSamplesToConsider, + (prev, next) => prev.RemainingWork > next.RemainingWork); + if (remainingWorkDecreasing) + { + status.Vote = ScaleVote.ScaleIn; + _logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{_monitoredContainer.Id}'."); + return status; + } + + _logger.LogInformation(Events.OnScaling, $"CosmosDB container '{_monitoredContainer.Id}' is steady."); + + return status; + } + + // Since all exceptions in the Cosmos client are thrown as CosmosExceptions, we have to parse their error strings because we dont have access to the internal types + private bool TryHandleCosmosException(Exception exception) + { + string errormsg = null; + string exceptionMessage = exception.Message; + + if (!string.IsNullOrEmpty(exceptionMessage)) + { + foreach (KeyValuePair exceptionString in KnownDocumentClientErrors) + { + if (exceptionMessage.IndexOf(exceptionString.Key, StringComparison.OrdinalIgnoreCase) >= 0) + { + errormsg = !string.IsNullOrEmpty(exceptionString.Value) ? exceptionString.Value : exceptionMessage; + } + } + } + + if (!string.IsNullOrEmpty(errormsg)) + { + _logger.LogWarning(errormsg); + return true; + } + + return false; + } + + private static bool IsTrueForLast(IList metrics, int count, Func predicate) + { + Debug.Assert(count > 1, "count must be greater than 1."); + Debug.Assert(count <= metrics.Count, "count must be less than or equal to the list size."); + + // Walks through the list from left to right starting at len(samples) - count. + for (int i = metrics.Count - count; i < metrics.Count - 1; i++) + { + if (!predicate(metrics[i], metrics[i + 1])) + { + return false; + } + } + + return true; + } + } +} diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs index 65c3c5d29..d45cbcb0e 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs @@ -5,7 +5,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { - internal class CosmosDBTriggerMetrics : ScaleMetrics + public class CosmosDBTriggerMetrics : ScaleMetrics { public int PartitionCount { get; set; } From 2d786dbe94ffa343e7de4311a96d87bd69e1a194 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 1 Nov 2022 11:04:03 -0700 Subject: [PATCH 02/28] create CosmosDBMetricsProvider --- .../Trigger/CosmosDBMetricsProvider.cs | 135 ++++++++++++++++++ .../Trigger/CosmosDBScaleMonitor.cs | 120 ++-------------- 2 files changed, 143 insertions(+), 112 deletions(-) create mode 100644 src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs new file mode 100644 index 000000000..39e31eebc --- /dev/null +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs @@ -0,0 +1,135 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger +{ + internal class CosmosDBMetricsProvider + { + private readonly ILogger _logger; + private readonly Container _monitoredContainer; + private readonly Container _leaseContainer; + private readonly string _processorName; + + private static readonly Dictionary KnownDocumentClientErrors = new Dictionary() + { + { "Resource Not Found", "Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files." }, + { "The input authorization token can't serve the request", string.Empty }, + { "The MAC signature found in the HTTP request is not the same", string.Empty }, + { "Service is currently unavailable.", string.Empty }, + { "Entity with the specified id does not exist in the system.", string.Empty }, + { "Subscription owning the database account is disabled.", string.Empty }, + { "Request rate is large", string.Empty }, + { "PartitionKey value must be supplied for this operation.", "We do not support lease containers with partitions at this time. Please create a new lease collection without partitions." }, + { "The remote name could not be resolved:", string.Empty }, + { "Owner resource does not exist", string.Empty }, + { "The specified document collection is invalid", string.Empty } + }; + + public CosmosDBMetricsProvider(ILoggerFactory loggerFactory, Container monitoredContainer, Container leaseContainer, string processorName) + { + _logger = loggerFactory.CreateLogger(); + _monitoredContainer = monitoredContainer; + _leaseContainer = leaseContainer; + _processorName = processorName; + } + + public async Task GetMetricsAsync() + { + int partitionCount = 0; + long remainingWork = 0; + + try + { + List partitionWorkList = new List(); + ChangeFeedEstimator estimator = _monitoredContainer.GetChangeFeedEstimator(_processorName, _leaseContainer); + using (FeedIterator iterator = estimator.GetCurrentStateIterator()) + { + while (iterator.HasMoreResults) + { + FeedResponse response = await iterator.ReadNextAsync(); + partitionWorkList.AddRange(response); + } + } + + partitionCount = partitionWorkList.Count; + remainingWork = partitionWorkList.Sum(item => item.EstimatedLag); + } + catch (Exception e) when (e is CosmosException || e is InvalidOperationException) + { + if (!TryHandleCosmosException(e)) + { + _logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message); + if (e is InvalidOperationException) + { + throw; + } + } + } + catch (System.Net.Http.HttpRequestException e) + { + string errormsg; + + var webException = e.InnerException as WebException; + if (webException != null && + webException.Status == WebExceptionStatus.ProtocolError) + { + string statusCode = ((HttpWebResponse)webException.Response).StatusCode.ToString(); + string statusDesc = ((HttpWebResponse)webException.Response).StatusDescription; + errormsg = string.Format("CosmosDBTrigger status {0}: {1}.", statusCode, statusDesc); + } + else if (webException != null && + webException.Status == WebExceptionStatus.NameResolutionFailure) + { + errormsg = string.Format("CosmosDBTrigger Exception message: {0}.", webException.Message); + } + else + { + errormsg = e.ToString(); + } + + _logger.LogWarning(Events.OnScaling, errormsg); + } + + return new CosmosDBTriggerMetrics + { + Timestamp = DateTime.UtcNow, + PartitionCount = partitionCount, + RemainingWork = remainingWork + }; + } + + // Since all exceptions in the Cosmos client are thrown as CosmosExceptions, we have to parse their error strings because we dont have access to the internal types + private bool TryHandleCosmosException(Exception exception) + { + string errormsg = null; + string exceptionMessage = exception.Message; + + if (!string.IsNullOrEmpty(exceptionMessage)) + { + foreach (KeyValuePair exceptionString in KnownDocumentClientErrors) + { + if (exceptionMessage.IndexOf(exceptionString.Key, StringComparison.OrdinalIgnoreCase) >= 0) + { + errormsg = !string.IsNullOrEmpty(exceptionString.Value) ? exceptionString.Value : exceptionMessage; + } + } + } + + if (!string.IsNullOrEmpty(errormsg)) + { + _logger.LogWarning(errormsg); + return true; + } + + return false; + } + } +} diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index d9eb8f37c..efbfc7dad 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -5,7 +5,6 @@ using System.Collections.Generic; using System.Diagnostics; using System.Linq; -using System.Net; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; using Microsoft.Azure.WebJobs.Host.Scale; @@ -18,102 +17,20 @@ public class CosmosDBScaleMonitor : IScaleMonitor private readonly ILogger _logger; private readonly string _functionId; private readonly Container _monitoredContainer; - private readonly Container _leaseContainer; - private readonly string _processorName; private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; - - private static readonly Dictionary KnownDocumentClientErrors = new Dictionary() - { - { "Resource Not Found", "Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files." }, - { "The input authorization token can't serve the request", string.Empty }, - { "The MAC signature found in the HTTP request is not the same", string.Empty }, - { "Service is currently unavailable.", string.Empty }, - { "Entity with the specified id does not exist in the system.", string.Empty }, - { "Subscription owning the database account is disabled.", string.Empty }, - { "Request rate is large", string.Empty }, - { "PartitionKey value must be supplied for this operation.", "We do not support lease containers with partitions at this time. Please create a new lease collection without partitions." }, - { "The remote name could not be resolved:", string.Empty }, - { "Owner resource does not exist", string.Empty }, - { "The specified document collection is invalid", string.Empty } - }; + private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider; public CosmosDBScaleMonitor(string functionId, ILoggerFactory loggerFactory, Container monitoredContainer, Container leaseContainer, string processorName) { _logger = loggerFactory.CreateLogger(); _functionId = functionId; _monitoredContainer = monitoredContainer; - _leaseContainer = leaseContainer; - _processorName = processorName; _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower()); + _cosmosDBMetricsProvider = new CosmosDBMetricsProvider(loggerFactory, monitoredContainer, leaseContainer, processorName); } public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor; - public async Task GetMetricsAsync() - { - int partitionCount = 0; - long remainingWork = 0; - - try - { - List partitionWorkList = new List(); - ChangeFeedEstimator estimator = _monitoredContainer.GetChangeFeedEstimator(_processorName, _leaseContainer); - using (FeedIterator iterator = estimator.GetCurrentStateIterator()) - { - while (iterator.HasMoreResults) - { - FeedResponse response = await iterator.ReadNextAsync(); - partitionWorkList.AddRange(response); - } - } - - partitionCount = partitionWorkList.Count; - remainingWork = partitionWorkList.Sum(item => item.EstimatedLag); - } - catch (Exception e) when (e is CosmosException || e is InvalidOperationException) - { - if (!TryHandleCosmosException(e)) - { - _logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message); - if (e is InvalidOperationException) - { - throw; - } - } - } - catch (System.Net.Http.HttpRequestException e) - { - string errormsg; - - var webException = e.InnerException as WebException; - if (webException != null && - webException.Status == WebExceptionStatus.ProtocolError) - { - string statusCode = ((HttpWebResponse)webException.Response).StatusCode.ToString(); - string statusDesc = ((HttpWebResponse)webException.Response).StatusDescription; - errormsg = string.Format("CosmosDBTrigger status {0}: {1}.", statusCode, statusDesc); - } - else if (webException != null && - webException.Status == WebExceptionStatus.NameResolutionFailure) - { - errormsg = string.Format("CosmosDBTrigger Exception message: {0}.", webException.Message); - } - else - { - errormsg = e.ToString(); - } - - _logger.LogWarning(Events.OnScaling, errormsg); - } - - return new CosmosDBTriggerMetrics - { - Timestamp = DateTime.UtcNow, - PartitionCount = partitionCount, - RemainingWork = remainingWork - }; - } - public ScaleStatus GetScaleStatus(ScaleStatusContext context) { return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); @@ -121,7 +38,12 @@ public ScaleStatus GetScaleStatus(ScaleStatusContext con async Task IScaleMonitor.GetMetricsAsync() { - return await GetMetricsAsync(); + return await GetMetricsAsync().ConfigureAwait(false); + } + + public async Task GetMetricsAsync() + { + return await _cosmosDBMetricsProvider.GetMetricsAsync().ConfigureAwait(false); } public ScaleStatus GetScaleStatus(ScaleStatusContext context) @@ -222,32 +144,6 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] return status; } - // Since all exceptions in the Cosmos client are thrown as CosmosExceptions, we have to parse their error strings because we dont have access to the internal types - private bool TryHandleCosmosException(Exception exception) - { - string errormsg = null; - string exceptionMessage = exception.Message; - - if (!string.IsNullOrEmpty(exceptionMessage)) - { - foreach (KeyValuePair exceptionString in KnownDocumentClientErrors) - { - if (exceptionMessage.IndexOf(exceptionString.Key, StringComparison.OrdinalIgnoreCase) >= 0) - { - errormsg = !string.IsNullOrEmpty(exceptionString.Value) ? exceptionString.Value : exceptionMessage; - } - } - } - - if (!string.IsNullOrEmpty(errormsg)) - { - _logger.LogWarning(errormsg); - return true; - } - - return false; - } - private static bool IsTrueForLast(IList metrics, int count, Func predicate) { Debug.Assert(count > 1, "count must be greater than 1."); From 541727933bd47e22b9aa67366ce406091a3bc741 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 1 Nov 2022 13:06:08 -0700 Subject: [PATCH 03/28] fixing unit tests --- .../Trigger/CosmosDBTriggerListener.cs | 239 +----------------- .../Trigger/CosmosDBListenerTests.cs | 25 +- .../WebJobs.Extensions.CosmosDB.Tests.csproj | 2 +- 3 files changed, 22 insertions(+), 244 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index 12dab1e97..5a564f801 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -3,12 +3,11 @@ using System; using System.Collections.Generic; -using System.Diagnostics; -using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; using Microsoft.Azure.WebJobs.Host.Scale; @@ -16,7 +15,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { - internal class CosmosDBTriggerListener : IListener, IScaleMonitor + internal class CosmosDBTriggerListener : IListener, IScaleMonitorProvider { private const int ListenerNotRegistered = 0; private const int ListenerRegistering = 1; @@ -33,25 +32,11 @@ internal class CosmosDBTriggerListener : IListener, IScaleMonitor _cosmosDBScaleMonitor; private ChangeFeedProcessor _host; private ChangeFeedProcessorBuilder _hostBuilder; private int _listenerStatus; - private static readonly Dictionary KnownDocumentClientErrors = new Dictionary() - { - { "Resource Not Found", "Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files." }, - { "The input authorization token can't serve the request", string.Empty }, - { "The MAC signature found in the HTTP request is not the same", string.Empty }, - { "Service is currently unavailable.", string.Empty }, - { "Entity with the specified id does not exist in the system.", string.Empty }, - { "Subscription owning the database account is disabled.", string.Empty }, - { "Request rate is large", string.Empty }, - { "PartitionKey value must be supplied for this operation.", "We do not support lease containers with partitions at this time. Please create a new lease collection without partitions." }, - { "The remote name could not be resolved:", string.Empty }, - { "Owner resource does not exist", string.Empty }, - { "The specified document collection is invalid", string.Empty } - }; - public CosmosDBTriggerListener( ITriggeredFunctionExecutor executor, string functionId, @@ -73,6 +58,9 @@ public CosmosDBTriggerListener( this._healthMonitor = new CosmosDBTriggerHealthMonitor(logger); this._listenerLogDetails = $"prefix='{this._processorName}', monitoredContainer='{this._monitoredContainer.Id}', monitoredDatabase='{this._monitoredContainer.Database.Id}', " + $"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'"; + + // TODO: logger factory creation + this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, new LoggerFactory(), _monitoredContainer, _leaseContainer, _processorName); } public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor; @@ -232,220 +220,9 @@ private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IRead } } - public async Task GetMetricsAsync() - { - int partitionCount = 0; - long remainingWork = 0; - - try - { - List partitionWorkList = new List(); - ChangeFeedEstimator estimator = this._monitoredContainer.GetChangeFeedEstimator(this._processorName, this._leaseContainer); - using (FeedIterator iterator = estimator.GetCurrentStateIterator()) - { - while (iterator.HasMoreResults) - { - FeedResponse response = await iterator.ReadNextAsync(); - partitionWorkList.AddRange(response); - } - } - - partitionCount = partitionWorkList.Count; - remainingWork = partitionWorkList.Sum(item => item.EstimatedLag); - } - catch (Exception e) when (e is CosmosException || e is InvalidOperationException) - { - if (!TryHandleCosmosException(e)) - { - _logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message); - if (e is InvalidOperationException) - { - throw; - } - } - } - catch (System.Net.Http.HttpRequestException e) - { - string errormsg; - - var webException = e.InnerException as WebException; - if (webException != null && - webException.Status == WebExceptionStatus.ProtocolError) - { - string statusCode = ((HttpWebResponse)webException.Response).StatusCode.ToString(); - string statusDesc = ((HttpWebResponse)webException.Response).StatusDescription; - errormsg = string.Format("CosmosDBTrigger status {0}: {1}.", statusCode, statusDesc); - } - else if (webException != null && - webException.Status == WebExceptionStatus.NameResolutionFailure) - { - errormsg = string.Format("CosmosDBTrigger Exception message: {0}.", webException.Message); - } - else - { - errormsg = e.ToString(); - } - - _logger.LogWarning(Events.OnScaling, errormsg); - } - - return new CosmosDBTriggerMetrics - { - Timestamp = DateTime.UtcNow, - PartitionCount = partitionCount, - RemainingWork = remainingWork - }; - } - - public ScaleStatus GetScaleStatus(ScaleStatusContext context) - { - return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); - } - - async Task IScaleMonitor.GetMetricsAsync() - { - return await GetMetricsAsync(); - } - - public ScaleStatus GetScaleStatus(ScaleStatusContext context) - { - return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); - } - - private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] metrics) - { - ScaleStatus status = new ScaleStatus - { - Vote = ScaleVote.None - }; - - const int NumberOfSamplesToConsider = 5; - - // Unable to determine the correct vote with no metrics. - if (metrics == null) - { - return status; - } - - // We shouldn't assign more workers than there are partitions (Cosmos DB, Event Hub, Service Bus Queue/Topic) - // This check is first, because it is independent of load or number of samples. - int partitionCount = metrics.Length > 0 ? metrics.Last().PartitionCount : 0; - if (partitionCount > 0 && partitionCount < workerCount) - { - status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount}).")); - _logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " + - $"of partitions for container ({this._monitoredContainer.Id}, {partitionCount}).")); - return status; - } - - // At least 5 samples are required to make a scale decision for the rest of the checks. - if (metrics.Length < NumberOfSamplesToConsider) - { - return status; - } - - // Maintain a minimum ratio of 1 worker per 1,000 items of remaining work. - long latestRemainingWork = metrics.Last().RemainingWork; - if (latestRemainingWork > workerCount * 1000) - { - status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000.")); - _logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " + - $"is too high relative to the number of instances ({workerCount}).")); - return status; - } - - bool documentsWaiting = metrics.All(m => m.RemainingWork > 0); - if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount) - { - status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed.")); - _logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions.")); - return status; - } - - // Check to see if the trigger source has been empty for a while. Only if all trigger sources are empty do we scale down. - bool isIdle = metrics.All(m => m.RemainingWork == 0); - if (isIdle) - { - status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation(Events.OnScaling, string.Format($"'{this._monitoredContainer.Id}' is idle.")); - return status; - } - - // Samples are in chronological order. Check for a continuous increase in work remaining. - // If detected, this results in an automatic scale out for the site container. - bool remainingWorkIncreasing = - IsTrueForLast( - metrics, - NumberOfSamplesToConsider, - (prev, next) => prev.RemainingWork < next.RemainingWork) && metrics[0].RemainingWork > 0; - if (remainingWorkIncreasing) - { - status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{this._monitoredContainer.Id}'."); - return status; - } - - bool remainingWorkDecreasing = - IsTrueForLast( - metrics, - NumberOfSamplesToConsider, - (prev, next) => prev.RemainingWork > next.RemainingWork); - if (remainingWorkDecreasing) - { - status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{this._monitoredContainer.Id}'."); - return status; - } - - _logger.LogInformation(Events.OnScaling, $"CosmosDB container '{this._monitoredContainer.Id}' is steady."); - - return status; - } - - // Since all exceptions in the Cosmos client are thrown as CosmosExceptions, we have to parse their error strings because we dont have access to the internal types - private bool TryHandleCosmosException(Exception exception) + public IScaleMonitor GetMonitor() { - string errormsg = null; - string exceptionMessage = exception.Message; - - if (!string.IsNullOrEmpty(exceptionMessage)) - { - foreach (KeyValuePair exceptionString in KnownDocumentClientErrors) - { - if (exceptionMessage.IndexOf(exceptionString.Key, StringComparison.OrdinalIgnoreCase) >= 0) - { - errormsg = !string.IsNullOrEmpty(exceptionString.Value) ? exceptionString.Value : exceptionMessage; - } - } - } - - if (!string.IsNullOrEmpty(errormsg)) - { - _logger.LogWarning(errormsg); - return true; - } - - return false; - } - - private static bool IsTrueForLast(IList metrics, int count, Func predicate) - { - Debug.Assert(count > 1, "count must be greater than 1."); - Debug.Assert(count <= metrics.Count, "count must be less than or equal to the list size."); - - // Walks through the list from left to right starting at len(samples) - count. - for (int i = metrics.Count - count; i < metrics.Count - 1; i++) - { - if (!predicate(metrics[i], metrics[i + 1])) - { - return false; - } - } - - return true; + return _cosmosDBScaleMonitor; } } } diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs index bc5cf0cfa..10115d2a1 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger; using Microsoft.Azure.WebJobs.Extensions.Tests.Common; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Scale; @@ -95,7 +96,7 @@ public async Task GetMetrics_ReturnsExpectedResult() .Setup(m => m.ReadNextAsync(It.IsAny())) .Returns(Task.FromResult(response.Object)); - var metrics = await _listener.GetMetricsAsync(); + var metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); Assert.Equal(0, metrics.PartitionCount); Assert.Equal(0, metrics.RemainingWork); @@ -116,7 +117,7 @@ public async Task GetMetrics_ReturnsExpectedResult() new ChangeFeedProcessorState("d", 5, string.Empty) }.GetEnumerator()); - metrics = await _listener.GetMetricsAsync(); + metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); Assert.Equal(4, metrics.PartitionCount); Assert.Equal(20, metrics.RemainingWork); @@ -157,7 +158,7 @@ public async Task GetMetrics_HandlesExceptions() .ThrowsAsync(new InvalidOperationException("Unknown")) .ThrowsAsync(new HttpRequestException("Uh oh", new System.Net.WebException("Uh oh again", WebExceptionStatus.NameResolutionFailure))); - var metrics = await _listener.GetMetricsAsync(); + var metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); Assert.Equal(0, metrics.PartitionCount); Assert.Equal(0, metrics.RemainingWork); @@ -167,13 +168,13 @@ public async Task GetMetrics_HandlesExceptions() Assert.Equal("Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files.", warning.FormattedMessage); _loggerProvider.ClearAllLogMessages(); - await Assert.ThrowsAsync(async () => await _listener.GetMetricsAsync()); + await Assert.ThrowsAsync(async () => await _listener.GetMonitor().GetMetricsAsync()); warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); Assert.Equal("Unable to handle System.InvalidOperationException: Unknown", warning.FormattedMessage); _loggerProvider.ClearAllLogMessages(); - metrics = await _listener.GetMetricsAsync(); + metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); Assert.Equal(0, metrics.PartitionCount); Assert.Equal(0, metrics.RemainingWork); @@ -191,11 +192,11 @@ public void GetScaleStatus_NoMetrics_ReturnsVote_None() WorkerCount = 1 }; - var status = _listener.GetScaleStatus(context); + var status = _listener.GetMonitor().GetScaleStatus(context); Assert.Equal(ScaleVote.None, status.Vote); // verify the non-generic implementation works properly - status = ((IScaleMonitor)_listener).GetScaleStatus(context); + status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); Assert.Equal(ScaleVote.None, status.Vote); } @@ -217,7 +218,7 @@ public void GetScaleStatus_InstancesPerPartitionThresholdExceeded_ReturnsVote_Sc new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetScaleStatus(context); + var status = _listener.GetMonitor().GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleIn, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -247,7 +248,7 @@ public void GetScaleStatus_MessagesPerWorkerThresholdExceeded_ReturnsVote_ScaleO new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetScaleStatus(context); + var status = _listener.GetMonitor().GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleOut, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -277,7 +278,7 @@ public void GetScaleStatus_ConsistentRemainingWork_ReturnsVote_ScaleOut() new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetScaleStatus(context); + var status = _listener.GetMonitor().GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleOut, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -307,7 +308,7 @@ public void GetScaleStatus_RemainingWorkIncreasing_ReturnsVote_ScaleOut() new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetScaleStatus(context); + var status = _listener.GetMonitor().GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleOut, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -334,7 +335,7 @@ public void GetScaleStatus_RemainingWorkDecreasing_ReturnsVote_ScaleIn() new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetScaleStatus(context); + var status = _listener.GetMonitor().GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleIn, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj index a051f2141..5280ea94e 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj +++ b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp2.1 + netcoreapp3.1 false Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests From 574e3819ca9ec16f53f3efcb4d360bef19e53c80 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 1 Nov 2022 15:13:53 -0700 Subject: [PATCH 04/28] fix unit tests --- .../Trigger/CosmosDBMetricsProvider.cs | 4 ++-- .../Trigger/CosmosDBScaleMonitor.cs | 10 +++++---- .../Trigger/CosmosDBTriggerListener.cs | 2 +- .../Trigger/CosmosDBListenerTests.cs | 22 +++++++++---------- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs index 39e31eebc..7ef282618 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs @@ -33,9 +33,9 @@ internal class CosmosDBMetricsProvider { "The specified document collection is invalid", string.Empty } }; - public CosmosDBMetricsProvider(ILoggerFactory loggerFactory, Container monitoredContainer, Container leaseContainer, string processorName) + public CosmosDBMetricsProvider(ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName) { - _logger = loggerFactory.CreateLogger(); + _logger = logger; _monitoredContainer = monitoredContainer; _leaseContainer = leaseContainer; _processorName = processorName; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index efbfc7dad..52f02caa6 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -20,20 +20,20 @@ public class CosmosDBScaleMonitor : IScaleMonitor private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider; - public CosmosDBScaleMonitor(string functionId, ILoggerFactory loggerFactory, Container monitoredContainer, Container leaseContainer, string processorName) + public CosmosDBScaleMonitor(string functionId, ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName) { - _logger = loggerFactory.CreateLogger(); + _logger = logger; _functionId = functionId; _monitoredContainer = monitoredContainer; _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower()); - _cosmosDBMetricsProvider = new CosmosDBMetricsProvider(loggerFactory, monitoredContainer, leaseContainer, processorName); + _cosmosDBMetricsProvider = new CosmosDBMetricsProvider(logger, monitoredContainer, leaseContainer, processorName); } public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor; public ScaleStatus GetScaleStatus(ScaleStatusContext context) { - return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); + return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray()); } async Task IScaleMonitor.GetMetricsAsync() @@ -48,6 +48,8 @@ public async Task GetMetricsAsync() public ScaleStatus GetScaleStatus(ScaleStatusContext context) { + context.Metrics?.Cast().ToArray(); + return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray()); } diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index 5a564f801..2762f5d09 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -60,7 +60,7 @@ public CosmosDBTriggerListener( $"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'"; // TODO: logger factory creation - this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, new LoggerFactory(), _monitoredContainer, _leaseContainer, _processorName); + this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, logger, _monitoredContainer, _leaseContainer, _processorName); } public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor; diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs index 10115d2a1..770659de3 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs @@ -76,7 +76,7 @@ public CosmosDBListenerTests() [Fact] public void ScaleMonitorDescriptor_ReturnsExpectedValue() { - Assert.Equal($"{_functionId}-cosmosdbtrigger-{DatabaseName}-{ContainerName}".ToLower(), _listener.Descriptor.Id); + Assert.Equal($"{_functionId}-cosmosdbtrigger-{DatabaseName}-{ContainerName}".ToLower(), _listener.GetMonitor().Descriptor.Id); } [Fact] @@ -96,7 +96,7 @@ public async Task GetMetrics_ReturnsExpectedResult() .Setup(m => m.ReadNextAsync(It.IsAny())) .Returns(Task.FromResult(response.Object)); - var metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); + var metrics = await ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetMetricsAsync(); Assert.Equal(0, metrics.PartitionCount); Assert.Equal(0, metrics.RemainingWork); @@ -117,7 +117,7 @@ public async Task GetMetrics_ReturnsExpectedResult() new ChangeFeedProcessorState("d", 5, string.Empty) }.GetEnumerator()); - metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); + metrics = await ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetMetricsAsync(); Assert.Equal(4, metrics.PartitionCount); Assert.Equal(20, metrics.RemainingWork); @@ -139,7 +139,7 @@ public async Task GetMetrics_ReturnsExpectedResult() }.GetEnumerator()); // verify non-generic interface works as expected - metrics = (CosmosDBTriggerMetrics)(await ((IScaleMonitor)_listener).GetMetricsAsync()); + metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); Assert.Equal(4, metrics.PartitionCount); Assert.Equal(20, metrics.RemainingWork); Assert.NotEqual(default(DateTime), metrics.Timestamp); @@ -192,11 +192,11 @@ public void GetScaleStatus_NoMetrics_ReturnsVote_None() WorkerCount = 1 }; - var status = _listener.GetMonitor().GetScaleStatus(context); + var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); Assert.Equal(ScaleVote.None, status.Vote); // verify the non-generic implementation works properly - status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); + status = _listener.GetMonitor().GetScaleStatus(context); Assert.Equal(ScaleVote.None, status.Vote); } @@ -218,7 +218,7 @@ public void GetScaleStatus_InstancesPerPartitionThresholdExceeded_ReturnsVote_Sc new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetMonitor().GetScaleStatus(context); + var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleIn, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -248,7 +248,7 @@ public void GetScaleStatus_MessagesPerWorkerThresholdExceeded_ReturnsVote_ScaleO new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetMonitor().GetScaleStatus(context); + var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleOut, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -278,7 +278,7 @@ public void GetScaleStatus_ConsistentRemainingWork_ReturnsVote_ScaleOut() new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetMonitor().GetScaleStatus(context); + var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleOut, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -308,7 +308,7 @@ public void GetScaleStatus_RemainingWorkIncreasing_ReturnsVote_ScaleOut() new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetMonitor().GetScaleStatus(context); + var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleOut, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); @@ -335,7 +335,7 @@ public void GetScaleStatus_RemainingWorkDecreasing_ReturnsVote_ScaleIn() new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, }; - var status = _listener.GetMonitor().GetScaleStatus(context); + var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); Assert.Equal(ScaleVote.ScaleIn, status.Vote); var logs = _loggerProvider.GetAllLogMessages().ToArray(); From cdf1b597c124cb94dc4f635c280f8f750a5d5a0f Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 1 Nov 2022 16:58:14 -0700 Subject: [PATCH 05/28] add cosmosdbtargetscaler and implement interface methods --- .../Trigger/CosmosDBTargetScaler.cs | 84 +++++++++++++++++++ .../WebJobs.Extensions.CosmosDB.csproj | 4 +- 2 files changed, 86 insertions(+), 2 deletions(-) create mode 100644 src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs new file mode 100644 index 000000000..5240439e6 --- /dev/null +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -0,0 +1,84 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger +{ + public class CosmosDBTargetScaler : ITargetScaler + { + public const int DefaultMaxItemsPerInvocation = 100; + private readonly string _functionId; + private readonly TargetScalerDescriptor _targetScalerDescriptor; + private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider; + private readonly ILogger _logger; + private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute; + private readonly Container _monitoredContainer; + + public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) + { + _functionId = functionId; + _targetScalerDescriptor = new TargetScalerDescriptor(functionId); + _monitoredContainer = monitoredContainer; + _cosmosDBMetricsProvider = new CosmosDBMetricsProvider(logger, _monitoredContainer, leaseContainer, processorName); + _logger = logger; + _cosmosDBTriggerAttribute = cosmosDBTriggerAttribute; + } + + public TargetScalerDescriptor TargetScalerDescriptor => _targetScalerDescriptor; + + public async Task GetScaleResultAsync(TargetScalerContext context) + { + CosmosDBTriggerMetrics metrics = await _cosmosDBMetricsProvider.GetMetricsAsync(); + + return GetScaleResultInternal(context, metrics.RemainingWork, metrics.PartitionCount); + } + + internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, long remainingWork, int partitionCount) + { + int concurrency; + + if (!context.InstanceConcurrency.HasValue) + { + if (_cosmosDBTriggerAttribute.MaxItemsPerInvocation > 0) + { + concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation; + } + else + { + concurrency = DefaultMaxItemsPerInvocation; + } + } + else + { + concurrency = context.InstanceConcurrency.Value; + } + + if (concurrency <= 0) + { + throw new ArgumentOutOfRangeException("Concurrency value for target based scale must be greater than 0"); + } + + int targetWorkerCount = (int)Math.Ceiling(remainingWork / (decimal)concurrency); + + string targetScaleMessage = $"'Target worker count for function '{_functionId}' is '{targetWorkerCount}' (MonitoredContainerId='{_monitoredContainer.Id}', MonitoredContainerDatabaseId='{_monitoredContainer.Database.Id}', RemainingWork ='{remainingWork}', Concurrency='{concurrency}')."; + + if (partitionCount > 0 && targetWorkerCount > partitionCount) + { + targetScaleMessage += $" However, partition count is {partitionCount}. Adding more workers than partitions would not be helpful, so capping target worker count at {partitionCount}"; + targetWorkerCount = partitionCount; + } + + _logger.LogInformation(targetScaleMessage); + + return new TargetScalerResult + { + TargetWorkerCount = targetWorkerCount + }; + } + } +} diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index 8a3067b33..45458e3ee 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -1,6 +1,6 @@  - netstandard2.0 + netstandard2.1 Microsoft.Azure.WebJobs.Extensions.CosmosDB Microsoft.Azure.WebJobs.Extensions.CosmosDB Microsoft.Azure.WebJobs.Extensions.CosmosDB @@ -20,7 +20,7 @@ - + From 32e347933ae962f2a682f166c4b2e34a860d0ace Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 1 Nov 2022 17:01:31 -0700 Subject: [PATCH 06/28] remove csproj upgrade -- for local dev only --- src/ExtensionsSample/ExtensionsSample.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ExtensionsSample/ExtensionsSample.csproj b/src/ExtensionsSample/ExtensionsSample.csproj index 3cb94854b..b1831bdc5 100644 --- a/src/ExtensionsSample/ExtensionsSample.csproj +++ b/src/ExtensionsSample/ExtensionsSample.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp3.1 + netcoreapp2.1 latest $(NoWarn);8002 From 72534698d53aa0a7ef3a9049bbe437fac9554b51 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 1 Nov 2022 17:02:51 -0700 Subject: [PATCH 07/28] remove 3.1 upgrade in test csproj for local testing only --- .../WebJobs.Extensions.CosmosDB.Tests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj index 5280ea94e..a051f2141 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj +++ b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp3.1 + netcoreapp2.1 false Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests From e413234d3c059373ff0202efd15c780cdf3e429b Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 2 Nov 2022 10:39:21 -0700 Subject: [PATCH 08/28] change target framework for cdb proj back to 2.0 --- .../WebJobs.Extensions.CosmosDB.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index 45458e3ee..632fc2f51 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -1,6 +1,6 @@  - netstandard2.1 + netstandard2.0 Microsoft.Azure.WebJobs.Extensions.CosmosDB Microsoft.Azure.WebJobs.Extensions.CosmosDB Microsoft.Azure.WebJobs.Extensions.CosmosDB From a78f6f53607665529eba64b00b2faf1b79d7519a Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 2 Nov 2022 11:33:57 -0700 Subject: [PATCH 09/28] add unit tests for cosmosdbtargetscaler --- .../Trigger/CosmosDBTargetScaler.cs | 3 +- .../Trigger/CosmosDBTargetScalerTests.cs | 80 +++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index 5240439e6..140182242 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -60,7 +60,8 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, if (concurrency <= 0) { - throw new ArgumentOutOfRangeException("Concurrency value for target based scale must be greater than 0"); + _logger.LogWarning($"Concurrency value for target based scale must be greater than 0. Using default value of {DefaultMaxItemsPerInvocation} as concurrency value."); + concurrency = DefaultMaxItemsPerInvocation; } int targetWorkerCount = (int)Math.Ceiling(remainingWork / (decimal)concurrency); diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs new file mode 100644 index 000000000..e994f655b --- /dev/null +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs @@ -0,0 +1,80 @@ +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger; +using Microsoft.Azure.WebJobs.Extensions.Tests.Common; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests.Trigger +{ + public class CosmosDBTargetScalerTests + { + private static readonly string DatabaseName = "testDb"; + private static readonly string ContainerName = "testContainer"; + private static readonly string ProcessorName = "theProcessor"; + + private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider(); + private readonly ILoggerFactory _loggerFactory; + private readonly Mock _monitoredContainer; + private readonly Mock _leasesContainer; + private readonly Mock> _estimatorIterator; + private readonly CosmosDBTriggerListener _listener; + private readonly string _functionId; + private readonly string _logDetails; + private readonly CosmosDBTargetScaler _targetScaler; + + private CosmosDBTriggerAttribute _attribute; + + public CosmosDBTargetScalerTests() + { + _loggerFactory = new LoggerFactory(); + _loggerFactory.AddProvider(_loggerProvider); + + _functionId = "testfunctionid"; + + var database = new Mock(MockBehavior.Strict); + database.Setup(d => d.Id).Returns(DatabaseName); + + _monitoredContainer = new Mock(MockBehavior.Strict); + _monitoredContainer.Setup(m => m.Id).Returns(ContainerName); + _monitoredContainer.Setup(m => m.Database).Returns(database.Object); + + _estimatorIterator = new Mock>(); + + Mock estimator = new Mock(); + estimator.Setup(m => m.GetCurrentStateIterator(It.IsAny())) + .Returns(_estimatorIterator.Object); + + _leasesContainer = new Mock(MockBehavior.Strict); + _leasesContainer.Setup(m => m.Id).Returns(ContainerName); + _leasesContainer.Setup(m => m.Database).Returns(database.Object); + + _monitoredContainer + .Setup(m => m.GetChangeFeedEstimator(It.Is(s => s == ProcessorName), It.Is(c => c == _leasesContainer.Object))) + .Returns(estimator.Object); + + _attribute = new CosmosDBTriggerAttribute(DatabaseName, ContainerName); + _targetScaler = new CosmosDBTargetScaler(_functionId, _attribute, _monitoredContainer.Object, _leasesContainer.Object, ProcessorName, _loggerFactory.CreateLogger>()); + } + + [Theory] + [InlineData(null, 200, 2, 2)] + [InlineData(null, 300, 2, 2)] + [InlineData(null, 300, 0, 3)] + [InlineData(null, 0, 3, 0)] + [InlineData(50, 200, 0, 4)] + [InlineData(-50, 200, 0, 2)] + public void GetScaleResultInternal(int? concurrency, long remainingWork, int partitionCount, int expectedTargetWorkerCount) + { + TargetScalerContext targetScalerContext = new TargetScalerContext + { + InstanceConcurrency = concurrency, + }; + + TargetScalerResult result = _targetScaler.GetScaleResultInternal(targetScalerContext, remainingWork, partitionCount); + + Assert.Equal(expectedTargetWorkerCount, result.TargetWorkerCount); + } + } +} From b704b018a7062b0b2737253bb65cedf8e9bc97cb Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 2 Nov 2022 12:05:10 -0700 Subject: [PATCH 10/28] add tests for getscaleresultasync in cosmosdbtargetscaler --- .../Trigger/CosmosDBTriggerListener.cs | 1 - .../Trigger/CosmosDBTargetScalerTests.cs | 36 ++++++++++++++++++- 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index 2762f5d09..a6a18275c 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -59,7 +59,6 @@ public CosmosDBTriggerListener( this._listenerLogDetails = $"prefix='{this._processorName}', monitoredContainer='{this._monitoredContainer.Id}', monitoredDatabase='{this._monitoredContainer.Database.Id}', " + $"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'"; - // TODO: logger factory creation this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, logger, _monitoredContainer, _leaseContainer, _processorName); } diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs index e994f655b..7f969f595 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs @@ -1,4 +1,9 @@ -using Microsoft.Azure.Cosmos; +using System; +using System.Collections; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger; using Microsoft.Azure.WebJobs.Extensions.Tests.Common; using Microsoft.Azure.WebJobs.Host.Scale; @@ -76,5 +81,34 @@ public void GetScaleResultInternal(int? concurrency, long remainingWork, int par Assert.Equal(expectedTargetWorkerCount, result.TargetWorkerCount); } + + [Fact] + public async Task GetScaleResultAsync() + { + TargetScalerContext targetScalerContext = new TargetScalerContext { }; + + _estimatorIterator + .SetupSequence(m => m.HasMoreResults) + .Returns(true) + .Returns(false); + + Mock> response = new Mock>(); + response + .Setup(m => m.GetEnumerator()) + .Returns(new List() + { + new ChangeFeedProcessorState("a", 100, string.Empty), + new ChangeFeedProcessorState("b", 100, string.Empty), + new ChangeFeedProcessorState("c", 50, string.Empty), + new ChangeFeedProcessorState("d", 100, string.Empty) + }.GetEnumerator()); + + _estimatorIterator + .Setup(m => m.ReadNextAsync(It.IsAny())) + .Returns(Task.FromResult(response.Object)); + + TargetScalerResult result = await _targetScaler.GetScaleResultAsync(targetScalerContext); + Assert.Equal(4, result.TargetWorkerCount); + } } } From eb3d7744edb2de9ebfd170744068f7afa1309078 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 2 Nov 2022 15:49:50 -0700 Subject: [PATCH 11/28] upgrade netcoreapp version to 3.1 --- src/ExtensionsSample/ExtensionsSample.csproj | 2 +- .../WebJobs.Extensions.CosmosDB.Tests.csproj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ExtensionsSample/ExtensionsSample.csproj b/src/ExtensionsSample/ExtensionsSample.csproj index b1831bdc5..3cb94854b 100644 --- a/src/ExtensionsSample/ExtensionsSample.csproj +++ b/src/ExtensionsSample/ExtensionsSample.csproj @@ -2,7 +2,7 @@ Exe - netcoreapp2.1 + netcoreapp3.1 latest $(NoWarn);8002 diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj index a051f2141..5280ea94e 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj +++ b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp2.1 + netcoreapp3.1 false Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests From ff576422f4c2f83b4e799b2fb3e5cb4b1fb47268 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Thu, 3 Nov 2022 09:48:54 -0700 Subject: [PATCH 12/28] make cdbtriggerlistener implement itargetscaler --- .../Trigger/CosmosDBTriggerListener.cs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index a6a18275c..48ca72b22 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -15,7 +15,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { - internal class CosmosDBTriggerListener : IListener, IScaleMonitorProvider + internal class CosmosDBTriggerListener : IListener, IScaleMonitorProvider, ITargetScalerProvider { private const int ListenerNotRegistered = 0; private const int ListenerRegistering = 1; @@ -33,6 +33,7 @@ internal class CosmosDBTriggerListener : IListener, IScaleMonitorProvider private readonly CosmosDBTriggerHealthMonitor _healthMonitor; private readonly string _listenerLogDetails; private readonly IScaleMonitor _cosmosDBScaleMonitor; + private readonly ITargetScaler _cosmosDBTargetScaler; private ChangeFeedProcessor _host; private ChangeFeedProcessorBuilder _hostBuilder; private int _listenerStatus; @@ -60,6 +61,7 @@ public CosmosDBTriggerListener( $"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'"; this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, logger, _monitoredContainer, _leaseContainer, _processorName); + this._cosmosDBTargetScaler = new CosmosDBTargetScaler(_functionId, _cosmosDBAttribute, _monitoredContainer, _leaseContainer, _processorName, _logger); } public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor; @@ -223,5 +225,10 @@ public IScaleMonitor GetMonitor() { return _cosmosDBScaleMonitor; } + + public ITargetScaler GetTargetScaler() + { + return _cosmosDBTargetScaler; + } } } From b6eef5b0ae2fbc15a3bf346a1fffabf64d181476 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Mon, 7 Nov 2022 13:28:29 -0800 Subject: [PATCH 13/28] make targetscaler, scalemonitor, and scalemetrics back to internal for now --- src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs | 2 +- src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs | 2 +- .../Trigger/CosmosDBTriggerMetrics.cs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index 52f02caa6..e2f673672 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -12,7 +12,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { - public class CosmosDBScaleMonitor : IScaleMonitor + internal class CosmosDBScaleMonitor : IScaleMonitor { private readonly ILogger _logger; private readonly string _functionId; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index 140182242..35a2f2a52 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -19,7 +19,7 @@ public class CosmosDBTargetScaler : ITargetScaler private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute; private readonly Container _monitoredContainer; - public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) + internal CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) { _functionId = functionId; _targetScalerDescriptor = new TargetScalerDescriptor(functionId); diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs index d45cbcb0e..65c3c5d29 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs @@ -5,7 +5,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { - public class CosmosDBTriggerMetrics : ScaleMetrics + internal class CosmosDBTriggerMetrics : ScaleMetrics { public int PartitionCount { get; set; } From cfb12408542490ae2ccaa691bd23963c7c9d33de Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Mon, 7 Nov 2022 13:29:59 -0800 Subject: [PATCH 14/28] make targetscaler internal --- src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index 35a2f2a52..e3c8d575d 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -9,7 +9,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { - public class CosmosDBTargetScaler : ITargetScaler + internal class CosmosDBTargetScaler : ITargetScaler { public const int DefaultMaxItemsPerInvocation = 100; private readonly string _functionId; From 6edfd38b19ffeb1514fdcfe5bd1608967271c2e9 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 16 Nov 2022 21:26:28 -0800 Subject: [PATCH 15/28] Update src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs Co-authored-by: Alexey Rodionov --- .../Trigger/CosmosDBTargetScaler.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index e3c8d575d..10b423a8a 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -44,7 +44,8 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, if (!context.InstanceConcurrency.HasValue) { - if (_cosmosDBTriggerAttribute.MaxItemsPerInvocation > 0) +concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation > 0 ? _cosmosDBTriggerAttribute.MaxItemsPerInvocation : DefaultMaxItemsPerInvocation; + { concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation; } From 3d30b677699bfcf4c23c27dd8623369e21a24af1 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 16 Nov 2022 21:29:16 -0800 Subject: [PATCH 16/28] use ternary operator when deciding which concurrency value to use --- .../Trigger/CosmosDBTargetScaler.cs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index 10b423a8a..294b6d2a4 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -44,15 +44,7 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, if (!context.InstanceConcurrency.HasValue) { -concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation > 0 ? _cosmosDBTriggerAttribute.MaxItemsPerInvocation : DefaultMaxItemsPerInvocation; - - { - concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation; - } - else - { - concurrency = DefaultMaxItemsPerInvocation; - } + concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation > 0 ? _cosmosDBTriggerAttribute.MaxItemsPerInvocation : DefaultMaxItemsPerInvocation; } else { From fc197fa1ee45f96b5a5242af11b8624f0ace1273 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 16 Nov 2022 21:30:01 -0800 Subject: [PATCH 17/28] remove extra quotation in target --- src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index 294b6d2a4..352752572 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -59,7 +59,7 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, int targetWorkerCount = (int)Math.Ceiling(remainingWork / (decimal)concurrency); - string targetScaleMessage = $"'Target worker count for function '{_functionId}' is '{targetWorkerCount}' (MonitoredContainerId='{_monitoredContainer.Id}', MonitoredContainerDatabaseId='{_monitoredContainer.Database.Id}', RemainingWork ='{remainingWork}', Concurrency='{concurrency}')."; + string targetScaleMessage = $"Target worker count for function '{_functionId}' is '{targetWorkerCount}' (MonitoredContainerId='{_monitoredContainer.Id}', MonitoredContainerDatabaseId='{_monitoredContainer.Database.Id}', RemainingWork ='{remainingWork}', Concurrency='{concurrency}')."; if (partitionCount > 0 && targetWorkerCount > partitionCount) { From 5d86627d9ab33e25f2b71699e70192a28df04414 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 16 Nov 2022 21:45:14 -0800 Subject: [PATCH 18/28] move unit tests from listener class to new metrics provider class --- .../Trigger/CosmosDBListenerTests.cs | 105 ----------- .../Trigger/CosmosDBMetricsProviderTests.cs | 170 ++++++++++++++++++ 2 files changed, 170 insertions(+), 105 deletions(-) create mode 100644 test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs index 770659de3..fe122274a 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs @@ -79,111 +79,6 @@ public void ScaleMonitorDescriptor_ReturnsExpectedValue() Assert.Equal($"{_functionId}-cosmosdbtrigger-{DatabaseName}-{ContainerName}".ToLower(), _listener.GetMonitor().Descriptor.Id); } - [Fact] - public async Task GetMetrics_ReturnsExpectedResult() - { - _estimatorIterator - .SetupSequence(m => m.HasMoreResults) - .Returns(true) - .Returns(false); - - Mock> response = new Mock>(); - response - .Setup(m => m.GetEnumerator()) - .Returns(new List().GetEnumerator()); - - _estimatorIterator - .Setup(m => m.ReadNextAsync(It.IsAny())) - .Returns(Task.FromResult(response.Object)); - - var metrics = await ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetMetricsAsync(); - - Assert.Equal(0, metrics.PartitionCount); - Assert.Equal(0, metrics.RemainingWork); - Assert.NotEqual(default(DateTime), metrics.Timestamp); - - _estimatorIterator - .SetupSequence(m => m.HasMoreResults) - .Returns(true) - .Returns(false); - - response - .Setup(m => m.GetEnumerator()) - .Returns(new List() - { - new ChangeFeedProcessorState("a", 5, string.Empty), - new ChangeFeedProcessorState("b", 5, string.Empty), - new ChangeFeedProcessorState("c", 5, string.Empty), - new ChangeFeedProcessorState("d", 5, string.Empty) - }.GetEnumerator()); - - metrics = await ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetMetricsAsync(); - - Assert.Equal(4, metrics.PartitionCount); - Assert.Equal(20, metrics.RemainingWork); - Assert.NotEqual(default(DateTime), metrics.Timestamp); - - _estimatorIterator - .SetupSequence(m => m.HasMoreResults) - .Returns(true) - .Returns(false); - - response - .Setup(m => m.GetEnumerator()) - .Returns(new List() - { - new ChangeFeedProcessorState("a", 5, string.Empty), - new ChangeFeedProcessorState("b", 5, string.Empty), - new ChangeFeedProcessorState("c", 5, string.Empty), - new ChangeFeedProcessorState("d", 5, string.Empty) - }.GetEnumerator()); - - // verify non-generic interface works as expected - metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); - Assert.Equal(4, metrics.PartitionCount); - Assert.Equal(20, metrics.RemainingWork); - Assert.NotEqual(default(DateTime), metrics.Timestamp); - } - - [Fact] - public async Task GetMetrics_HandlesExceptions() - { - // Can't test DocumentClientExceptions because they can't be constructed. - _estimatorIterator - .Setup(m => m.HasMoreResults).Returns(true); - - _estimatorIterator - .SetupSequence(m => m.ReadNextAsync(It.IsAny())) - .ThrowsAsync(new CosmosException("Resource not found", HttpStatusCode.NotFound, 0, string.Empty, 0)) - .ThrowsAsync(new InvalidOperationException("Unknown")) - .ThrowsAsync(new HttpRequestException("Uh oh", new System.Net.WebException("Uh oh again", WebExceptionStatus.NameResolutionFailure))); - - var metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); - - Assert.Equal(0, metrics.PartitionCount); - Assert.Equal(0, metrics.RemainingWork); - Assert.NotEqual(default(DateTime), metrics.Timestamp); - - var warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); - Assert.Equal("Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files.", warning.FormattedMessage); - _loggerProvider.ClearAllLogMessages(); - - await Assert.ThrowsAsync(async () => await _listener.GetMonitor().GetMetricsAsync()); - - warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); - Assert.Equal("Unable to handle System.InvalidOperationException: Unknown", warning.FormattedMessage); - _loggerProvider.ClearAllLogMessages(); - - metrics = (CosmosDBTriggerMetrics)await _listener.GetMonitor().GetMetricsAsync(); - - Assert.Equal(0, metrics.PartitionCount); - Assert.Equal(0, metrics.RemainingWork); - Assert.NotEqual(default(DateTime), metrics.Timestamp); - - warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); - Assert.Equal("CosmosDBTrigger Exception message: Uh oh again.", warning.FormattedMessage); - } - [Fact] public void GetScaleStatus_NoMetrics_ReturnsVote_None() { diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs new file mode 100644 index 000000000..cf71f34c5 --- /dev/null +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs @@ -0,0 +1,170 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger; +using Microsoft.Azure.WebJobs.Extensions.Tests.Common; +using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests.Trigger +{ + public class CosmosDBMetricsProviderTests + { + private static readonly string DatabaseName = "testDb"; + private static readonly string ContainerName = "testContainer"; + private static readonly string ProcessorName = "theProcessor"; + + private readonly ILoggerFactory _loggerFactory; + private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider(); + private readonly Mock> _estimatorIterator; + private readonly Mock _monitoredContainer; + private readonly Mock _leasesContainer; + private readonly CosmosDBMetricsProvider _cosmosDbMetricsProvider; + + public CosmosDBMetricsProviderTests() + { + _loggerFactory = new LoggerFactory(); + _loggerFactory.AddProvider(_loggerProvider); + + var database = new Mock(MockBehavior.Strict); + database.Setup(d => d.Id).Returns(DatabaseName); + + _monitoredContainer = new Mock(MockBehavior.Strict); + _monitoredContainer.Setup(m => m.Id).Returns(ContainerName); + _monitoredContainer.Setup(m => m.Database).Returns(database.Object); + + _estimatorIterator = new Mock>(); + + Mock estimator = new Mock(); + estimator.Setup(m => m.GetCurrentStateIterator(It.IsAny())) + .Returns(_estimatorIterator.Object); + + _leasesContainer = new Mock(MockBehavior.Strict); + _leasesContainer.Setup(m => m.Id).Returns(ContainerName); + _leasesContainer.Setup(m => m.Database).Returns(database.Object); + + _monitoredContainer + .Setup(m => m.GetChangeFeedEstimator(It.Is(s => s == ProcessorName), It.Is(c => c == _leasesContainer.Object))) + .Returns(estimator.Object); + + _cosmosDbMetricsProvider = new CosmosDBMetricsProvider(_loggerFactory.CreateLogger(), _monitoredContainer.Object, _leasesContainer.Object, ProcessorName); + } + + [Fact] + public async Task GetMetrics_ReturnsExpectedResult() + { + _estimatorIterator + .SetupSequence(m => m.HasMoreResults) + .Returns(true) + .Returns(false); + + Mock> response = new Mock>(); + response + .Setup(m => m.GetEnumerator()) + .Returns(new List().GetEnumerator()); + + _estimatorIterator + .Setup(m => m.ReadNextAsync(It.IsAny())) + .Returns(Task.FromResult(response.Object)); + + var metrics = await _cosmosDbMetricsProvider.GetMetricsAsync(); + + Assert.Equal(0, metrics.PartitionCount); + Assert.Equal(0, metrics.RemainingWork); + Assert.NotEqual(default(DateTime), metrics.Timestamp); + + _estimatorIterator + .SetupSequence(m => m.HasMoreResults) + .Returns(true) + .Returns(false); + + response + .Setup(m => m.GetEnumerator()) + .Returns(new List() + { + new ChangeFeedProcessorState("a", 5, string.Empty), + new ChangeFeedProcessorState("b", 5, string.Empty), + new ChangeFeedProcessorState("c", 5, string.Empty), + new ChangeFeedProcessorState("d", 5, string.Empty) + }.GetEnumerator()); + + metrics = await _cosmosDbMetricsProvider.GetMetricsAsync(); + + Assert.Equal(4, metrics.PartitionCount); + Assert.Equal(20, metrics.RemainingWork); + Assert.NotEqual(default(DateTime), metrics.Timestamp); + + _estimatorIterator + .SetupSequence(m => m.HasMoreResults) + .Returns(true) + .Returns(false); + + response + .Setup(m => m.GetEnumerator()) + .Returns(new List() + { + new ChangeFeedProcessorState("a", 5, string.Empty), + new ChangeFeedProcessorState("b", 5, string.Empty), + new ChangeFeedProcessorState("c", 5, string.Empty), + new ChangeFeedProcessorState("d", 5, string.Empty) + }.GetEnumerator()); + + // verify non-generic interface works as expected + metrics = (CosmosDBTriggerMetrics)await _cosmosDbMetricsProvider.GetMetricsAsync(); + Assert.Equal(4, metrics.PartitionCount); + Assert.Equal(20, metrics.RemainingWork); + Assert.NotEqual(default(DateTime), metrics.Timestamp); + } + + [Fact] + public async Task GetMetrics_HandlesExceptions() + { + // Can't test DocumentClientExceptions because they can't be constructed. + _estimatorIterator + .Setup(m => m.HasMoreResults).Returns(true); + + _estimatorIterator + .SetupSequence(m => m.ReadNextAsync(It.IsAny())) + .ThrowsAsync(new CosmosException("Resource not found", HttpStatusCode.NotFound, 0, string.Empty, 0)) + .ThrowsAsync(new InvalidOperationException("Unknown")) + .ThrowsAsync(new HttpRequestException("Uh oh", new System.Net.WebException("Uh oh again", WebExceptionStatus.NameResolutionFailure))); + + var metrics = (CosmosDBTriggerMetrics)await _cosmosDbMetricsProvider.GetMetricsAsync(); + + Assert.Equal(0, metrics.PartitionCount); + Assert.Equal(0, metrics.RemainingWork); + Assert.NotEqual(default(DateTime), metrics.Timestamp); + + var warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); + Assert.Equal("Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files.", warning.FormattedMessage); + _loggerProvider.ClearAllLogMessages(); + + await Assert.ThrowsAsync(async () => await _cosmosDbMetricsProvider.GetMetricsAsync()); + + warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); + Assert.Equal("Unable to handle System.InvalidOperationException: Unknown", warning.FormattedMessage); + _loggerProvider.ClearAllLogMessages(); + + metrics = (CosmosDBTriggerMetrics)await _cosmosDbMetricsProvider.GetMetricsAsync(); + + Assert.Equal(0, metrics.PartitionCount); + Assert.Equal(0, metrics.RemainingWork); + Assert.NotEqual(default(DateTime), metrics.Timestamp); + + warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning); + Assert.Equal("CosmosDBTrigger Exception message: Uh oh again.", warning.FormattedMessage); + } + } +} From 2e63b30af1c025ac7599b3a19603b30c2c28b04b Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 16 Nov 2022 21:59:43 -0800 Subject: [PATCH 19/28] moved cosmosdb scalemonitor tests to separate test class --- .../Trigger/CosmosDBListenerTests.cs | 166 ------------ .../Trigger/CosmosDBMetricsProviderTests.cs | 3 - .../Trigger/CosmosDBScaleMonitorTests.cs | 241 ++++++++++++++++++ 3 files changed, 241 insertions(+), 169 deletions(-) create mode 100644 test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs index fe122274a..326db586d 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs @@ -73,172 +73,6 @@ public CosmosDBListenerTests() $"leaseContainer='{ContainerName}', leaseDatabase='{DatabaseName}', functionId='{this._functionId}'"; } - [Fact] - public void ScaleMonitorDescriptor_ReturnsExpectedValue() - { - Assert.Equal($"{_functionId}-cosmosdbtrigger-{DatabaseName}-{ContainerName}".ToLower(), _listener.GetMonitor().Descriptor.Id); - } - - [Fact] - public void GetScaleStatus_NoMetrics_ReturnsVote_None() - { - var context = new ScaleStatusContext - { - WorkerCount = 1 - }; - - var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); - Assert.Equal(ScaleVote.None, status.Vote); - - // verify the non-generic implementation works properly - status = _listener.GetMonitor().GetScaleStatus(context); - Assert.Equal(ScaleVote.None, status.Vote); - } - - [Fact] - public void GetScaleStatus_InstancesPerPartitionThresholdExceeded_ReturnsVote_ScaleIn() - { - var context = new ScaleStatusContext - { - WorkerCount = 2 - }; - var timestamp = DateTime.UtcNow; - context.Metrics = new List - { - new CosmosDBTriggerMetrics { RemainingWork = 2500, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2505, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2612, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2700, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2810, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, - }; - - var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); - Assert.Equal(ScaleVote.ScaleIn, status.Vote); - - var logs = _loggerProvider.GetAllLogMessages().ToArray(); - var log = logs[0]; - Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); - Assert.Equal("WorkerCount (2) > PartitionCount (1).", log.FormattedMessage); - log = logs[1]; - Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); - Assert.Equal($"Number of instances (2) is too high relative to number of partitions for container ({ContainerName}, 1).", log.FormattedMessage); - } - - [Fact] - public void GetScaleStatus_MessagesPerWorkerThresholdExceeded_ReturnsVote_ScaleOut() - { - var context = new ScaleStatusContext - { - WorkerCount = 1 - }; - var timestamp = DateTime.UtcNow; - context.Metrics = new List - { - new CosmosDBTriggerMetrics { RemainingWork = 2500, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2505, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2612, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2700, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2810, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - }; - - var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); - Assert.Equal(ScaleVote.ScaleOut, status.Vote); - - var logs = _loggerProvider.GetAllLogMessages().ToArray(); - var log = logs[0]; - Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); - Assert.Equal("RemainingWork (2900) > WorkerCount (1) * 1,000.", log.FormattedMessage); - log = logs[1]; - Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); - Assert.Equal($"Remaining work for container ({ContainerName}, 2900) is too high relative to the number of instances (1).", log.FormattedMessage); - } - - [Fact] - public void GetScaleStatus_ConsistentRemainingWork_ReturnsVote_ScaleOut() - { - var context = new ScaleStatusContext - { - WorkerCount = 1 - }; - var timestamp = DateTime.UtcNow; - context.Metrics = new List - { - new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, - }; - - var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); - Assert.Equal(ScaleVote.ScaleOut, status.Vote); - - var logs = _loggerProvider.GetAllLogMessages().ToArray(); - var log = logs[0]; - Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); - Assert.Equal($"CosmosDB container '{ContainerName}' has documents waiting to be processed.", log.FormattedMessage); - log = logs[1]; - Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); - Assert.Equal("There are 1 instances relative to 2 partitions.", log.FormattedMessage); - } - - [Fact] - public void GetScaleStatus_RemainingWorkIncreasing_ReturnsVote_ScaleOut() - { - var context = new ScaleStatusContext - { - WorkerCount = 1 - }; - var timestamp = DateTime.UtcNow; - context.Metrics = new List - { - new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 20, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 40, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 80, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 100, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - }; - - var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); - Assert.Equal(ScaleVote.ScaleOut, status.Vote); - - var logs = _loggerProvider.GetAllLogMessages().ToArray(); - var log = logs[0]; - Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); - Assert.Equal($"Remaining work is increasing for '{ContainerName}'.", log.FormattedMessage); - } - - [Fact] - public void GetScaleStatus_RemainingWorkDecreasing_ReturnsVote_ScaleIn() - { - var context = new ScaleStatusContext - { - WorkerCount = 1 - }; - var timestamp = DateTime.UtcNow; - context.Metrics = new List - { - new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 100, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 80, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 40, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 20, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, - }; - - var status = ((CosmosDBScaleMonitor)_listener.GetMonitor()).GetScaleStatus(context); - Assert.Equal(ScaleVote.ScaleIn, status.Vote); - - var logs = _loggerProvider.GetAllLogMessages().ToArray(); - var log = logs[0]; - Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); - Assert.Equal($"Remaining work is decreasing for '{ContainerName}'.", log.FormattedMessage); - } - [Fact] public async Task StartAsync_Retries() { diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs index cf71f34c5..6da53c646 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs @@ -11,10 +11,7 @@ using Microsoft.Azure.Cosmos; using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger; using Microsoft.Azure.WebJobs.Extensions.Tests.Common; -using Microsoft.Azure.WebJobs.Host.Executors; -using Microsoft.Azure.WebJobs.Host.Scale; using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Abstractions; using Moq; using Xunit; diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs new file mode 100644 index 000000000..fa436bc8f --- /dev/null +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs @@ -0,0 +1,241 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger; +using Microsoft.Azure.WebJobs.Extensions.Tests.Common; +using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests.Trigger +{ + public class CosmosDBScaleMonitorTests + { + private static readonly string DatabaseName = "testDb"; + private static readonly string ContainerName = "testContainer"; + private static readonly string ProcessorName = "theProcessor"; + + private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider(); + private readonly ILoggerFactory _loggerFactory; + private readonly Mock _mockExecutor; + private readonly Mock _monitoredContainer; + private readonly Mock _leasesContainer; + private readonly Mock> _estimatorIterator; + private readonly string _functionId; + private readonly string _logDetails; + private readonly IScaleMonitor _scaleMonitor; + + public CosmosDBScaleMonitorTests() + { + _loggerFactory = new LoggerFactory(); + _loggerFactory.AddProvider(_loggerProvider); + + _mockExecutor = new Mock(); + _functionId = "testfunctionid"; + + var database = new Mock(MockBehavior.Strict); + database.Setup(d => d.Id).Returns(DatabaseName); + + _monitoredContainer = new Mock(MockBehavior.Strict); + _monitoredContainer.Setup(m => m.Id).Returns(ContainerName); + _monitoredContainer.Setup(m => m.Database).Returns(database.Object); + + _estimatorIterator = new Mock>(); + + Mock estimator = new Mock(); + estimator.Setup(m => m.GetCurrentStateIterator(It.IsAny())) + .Returns(_estimatorIterator.Object); + + _leasesContainer = new Mock(MockBehavior.Strict); + _leasesContainer.Setup(m => m.Id).Returns(ContainerName); + _leasesContainer.Setup(m => m.Database).Returns(database.Object); + + _monitoredContainer + .Setup(m => m.GetChangeFeedEstimator(It.Is(s => s == ProcessorName), It.Is(c => c == _leasesContainer.Object))) + .Returns(estimator.Object); + + var attribute = new CosmosDBTriggerAttribute(DatabaseName, ContainerName); + + _logDetails = $"prefix='{ProcessorName}', monitoredContainer='{ContainerName}', monitoredDatabase='{DatabaseName}', " + + $"leaseContainer='{ContainerName}', leaseDatabase='{DatabaseName}', functionId='{this._functionId}'"; + _scaleMonitor = new CosmosDBScaleMonitor(_functionId, _loggerFactory.CreateLogger(), _monitoredContainer.Object, _leasesContainer.Object, ProcessorName); + } + + [Fact] + public void ScaleMonitorDescriptor_ReturnsExpectedValue() + { + Assert.Equal($"{_functionId}-cosmosdbtrigger-{DatabaseName}-{ContainerName}".ToLower(), _scaleMonitor.Descriptor.Id); + } + + [Fact] + public void GetScaleStatus_NoMetrics_ReturnsVote_None() + { + var context = new ScaleStatusContext + { + WorkerCount = 1 + }; + + var status = _scaleMonitor.GetScaleStatus(context); + Assert.Equal(ScaleVote.None, status.Vote); + + // verify the non-generic implementation works properly + status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context); + Assert.Equal(ScaleVote.None, status.Vote); + } + + [Fact] + public void GetScaleStatus_InstancesPerPartitionThresholdExceeded_ReturnsVote_ScaleIn() + { + var context = new ScaleStatusContext + { + WorkerCount = 2 + }; + var timestamp = DateTime.UtcNow; + context.Metrics = new List + { + new CosmosDBTriggerMetrics { RemainingWork = 2500, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2505, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2612, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2700, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2810, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) }, + }; + + var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context); + Assert.Equal(ScaleVote.ScaleIn, status.Vote); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + var log = logs[0]; + Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); + Assert.Equal("WorkerCount (2) > PartitionCount (1).", log.FormattedMessage); + log = logs[1]; + Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); + Assert.Equal($"Number of instances (2) is too high relative to number of partitions for container ({ContainerName}, 1).", log.FormattedMessage); + } + + [Fact] + public void GetScaleStatus_MessagesPerWorkerThresholdExceeded_ReturnsVote_ScaleOut() + { + var context = new ScaleStatusContext + { + WorkerCount = 1 + }; + var timestamp = DateTime.UtcNow; + context.Metrics = new List + { + new CosmosDBTriggerMetrics { RemainingWork = 2500, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2505, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2612, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2700, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2810, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + }; + + var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context); + Assert.Equal(ScaleVote.ScaleOut, status.Vote); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + var log = logs[0]; + Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); + Assert.Equal("RemainingWork (2900) > WorkerCount (1) * 1,000.", log.FormattedMessage); + log = logs[1]; + Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); + Assert.Equal($"Remaining work for container ({ContainerName}, 2900) is too high relative to the number of instances (1).", log.FormattedMessage); + } + + [Fact] + public void GetScaleStatus_ConsistentRemainingWork_ReturnsVote_ScaleOut() + { + var context = new ScaleStatusContext + { + WorkerCount = 1 + }; + var timestamp = DateTime.UtcNow; + context.Metrics = new List + { + new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) }, + }; + + var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context); + Assert.Equal(ScaleVote.ScaleOut, status.Vote); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + var log = logs[0]; + Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); + Assert.Equal($"CosmosDB container '{ContainerName}' has documents waiting to be processed.", log.FormattedMessage); + log = logs[1]; + Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); + Assert.Equal("There are 1 instances relative to 2 partitions.", log.FormattedMessage); + } + + [Fact] + public void GetScaleStatus_RemainingWorkIncreasing_ReturnsVote_ScaleOut() + { + var context = new ScaleStatusContext + { + WorkerCount = 1 + }; + var timestamp = DateTime.UtcNow; + context.Metrics = new List + { + new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 20, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 40, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 80, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 100, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + }; + + var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context); + Assert.Equal(ScaleVote.ScaleOut, status.Vote); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + var log = logs[0]; + Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); + Assert.Equal($"Remaining work is increasing for '{ContainerName}'.", log.FormattedMessage); + } + + [Fact] + public void GetScaleStatus_RemainingWorkDecreasing_ReturnsVote_ScaleIn() + { + var context = new ScaleStatusContext + { + WorkerCount = 1 + }; + var timestamp = DateTime.UtcNow; + context.Metrics = new List + { + new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 100, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 80, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 40, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 20, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) }, + }; + + var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context); + Assert.Equal(ScaleVote.ScaleIn, status.Vote); + + var logs = _loggerProvider.GetAllLogMessages().ToArray(); + var log = logs[0]; + Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level); + Assert.Equal($"Remaining work is decreasing for '{ContainerName}'.", log.FormattedMessage); + } + } +} From 45929762cafe856bb5318e1755155d1526c9581f Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Mon, 21 Nov 2022 15:34:55 -0800 Subject: [PATCH 20/28] upgrade webjobs sdk version, use new scale monitor descriptor --- src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs | 2 +- .../WebJobs.Extensions.CosmosDB.csproj | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index e2f673672..974993cbb 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -25,7 +25,7 @@ public CosmosDBScaleMonitor(string functionId, ILogger logger, Container monitor _logger = logger; _functionId = functionId; _monitoredContainer = monitoredContainer; - _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower()); + _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower(), _functionId); _cosmosDBMetricsProvider = new CosmosDBMetricsProvider(logger, monitoredContainer, leaseContainer, processorName); } diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index 632fc2f51..3f8ce73fd 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -20,7 +20,7 @@ - + From da2da07becc1d1c05ae8bc76d3dcb55ac680bd26 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Thu, 12 Jan 2023 12:49:42 -0500 Subject: [PATCH 21/28] make cosmosdbtargetscaler, scalemonitor, and scalemetrics public --- src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs | 2 +- src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs | 2 +- .../Trigger/CosmosDBTriggerMetrics.cs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index 974993cbb..f2542f583 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -12,7 +12,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { - internal class CosmosDBScaleMonitor : IScaleMonitor + public class CosmosDBScaleMonitor : IScaleMonitor { private readonly ILogger _logger; private readonly string _functionId; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index 352752572..540533f78 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -9,7 +9,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { - internal class CosmosDBTargetScaler : ITargetScaler + public class CosmosDBTargetScaler : ITargetScaler { public const int DefaultMaxItemsPerInvocation = 100; private readonly string _functionId; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs index 65c3c5d29..d45cbcb0e 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs @@ -5,7 +5,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { - internal class CosmosDBTriggerMetrics : ScaleMetrics + public class CosmosDBTriggerMetrics : ScaleMetrics { public int PartitionCount { get; set; } From 40502823133f147ee8fa906fb729571033acd7c4 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Fri, 13 Jan 2023 10:47:08 -0500 Subject: [PATCH 22/28] use prerelease version for webjobs sdk, add xml docs to public classes --- .../Trigger/CosmosDBScaleMonitor.cs | 11 +++++++++++ .../Trigger/CosmosDBTargetScaler.cs | 14 +++++++++++++- .../Trigger/CosmosDBTriggerMetrics.cs | 3 +++ .../WebJobs.Extensions.CosmosDB.csproj | 2 +- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index f2542f583..dec9cd5a4 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -12,6 +12,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { + /// + /// Scale Monitor class for a CosmosDB function. + /// public class CosmosDBScaleMonitor : IScaleMonitor { private readonly ILogger _logger; @@ -20,6 +23,14 @@ public class CosmosDBScaleMonitor : IScaleMonitor private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider; + /// + /// Instantiates a scale monitor for CosmosDB function. + /// + /// FunctionId of the monitored function. + /// Used for logging. + /// Monitored container for CosmosDB function. + /// Lease container for CosmosDB function. + /// Processor name used for function. public CosmosDBScaleMonitor(string functionId, ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName) { _logger = logger; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index 540533f78..cd6b780d2 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -9,6 +9,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { + /// + /// Target Scaler for a CosmosDB function. + /// public class CosmosDBTargetScaler : ITargetScaler { public const int DefaultMaxItemsPerInvocation = 100; @@ -19,7 +22,16 @@ public class CosmosDBTargetScaler : ITargetScaler private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute; private readonly Container _monitoredContainer; - internal CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) + /// + /// Instantiates a target based scaler. + /// + /// FunctionId of the monitored function. + /// Trigger attribute used for target based scaling. + /// Monitored container for CosmosDB function. + /// Lease container for CosmosDB function. + /// Processor name used for function. + /// Used for logging. + public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) { _functionId = functionId; _targetScalerDescriptor = new TargetScalerDescriptor(functionId); diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs index d45cbcb0e..9ddd56d08 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs @@ -5,6 +5,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { + /// + /// Metrics used to make scaling decisions for a CosmosDB function. + /// public class CosmosDBTriggerMetrics : ScaleMetrics { public int PartitionCount { get; set; } diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index 3f8ce73fd..7afa814fe 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -20,7 +20,7 @@ - + From 1f5e7b6ec2815355f582206a89a4a78a4d41860b Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 17 Jan 2023 15:05:33 -0500 Subject: [PATCH 23/28] Revert "use prerelease version for webjobs sdk, add xml docs to public classes" This reverts commit 40502823133f147ee8fa906fb729571033acd7c4. --- .../Trigger/CosmosDBScaleMonitor.cs | 11 ----------- .../Trigger/CosmosDBTargetScaler.cs | 14 +------------- .../Trigger/CosmosDBTriggerMetrics.cs | 3 --- .../WebJobs.Extensions.CosmosDB.csproj | 2 +- 4 files changed, 2 insertions(+), 28 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index dec9cd5a4..f2542f583 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -12,9 +12,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { - /// - /// Scale Monitor class for a CosmosDB function. - /// public class CosmosDBScaleMonitor : IScaleMonitor { private readonly ILogger _logger; @@ -23,14 +20,6 @@ public class CosmosDBScaleMonitor : IScaleMonitor private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider; - /// - /// Instantiates a scale monitor for CosmosDB function. - /// - /// FunctionId of the monitored function. - /// Used for logging. - /// Monitored container for CosmosDB function. - /// Lease container for CosmosDB function. - /// Processor name used for function. public CosmosDBScaleMonitor(string functionId, ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName) { _logger = logger; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index cd6b780d2..540533f78 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -9,9 +9,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { - /// - /// Target Scaler for a CosmosDB function. - /// public class CosmosDBTargetScaler : ITargetScaler { public const int DefaultMaxItemsPerInvocation = 100; @@ -22,16 +19,7 @@ public class CosmosDBTargetScaler : ITargetScaler private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute; private readonly Container _monitoredContainer; - /// - /// Instantiates a target based scaler. - /// - /// FunctionId of the monitored function. - /// Trigger attribute used for target based scaling. - /// Monitored container for CosmosDB function. - /// Lease container for CosmosDB function. - /// Processor name used for function. - /// Used for logging. - public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) + internal CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) { _functionId = functionId; _targetScalerDescriptor = new TargetScalerDescriptor(functionId); diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs index 9ddd56d08..d45cbcb0e 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs @@ -5,9 +5,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { - /// - /// Metrics used to make scaling decisions for a CosmosDB function. - /// public class CosmosDBTriggerMetrics : ScaleMetrics { public int PartitionCount { get; set; } diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index 7afa814fe..3f8ce73fd 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -20,7 +20,7 @@ - + From e280e40ff517cf8aa5356d6eac35558f6e221e67 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 17 Jan 2023 15:06:54 -0500 Subject: [PATCH 24/28] Revert "Revert "use prerelease version for webjobs sdk, add xml docs to public classes"" This reverts commit 1f5e7b6ec2815355f582206a89a4a78a4d41860b. --- .../Trigger/CosmosDBScaleMonitor.cs | 11 +++++++++++ .../Trigger/CosmosDBTargetScaler.cs | 14 +++++++++++++- .../Trigger/CosmosDBTriggerMetrics.cs | 3 +++ .../WebJobs.Extensions.CosmosDB.csproj | 2 +- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index f2542f583..dec9cd5a4 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -12,6 +12,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { + /// + /// Scale Monitor class for a CosmosDB function. + /// public class CosmosDBScaleMonitor : IScaleMonitor { private readonly ILogger _logger; @@ -20,6 +23,14 @@ public class CosmosDBScaleMonitor : IScaleMonitor private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor; private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider; + /// + /// Instantiates a scale monitor for CosmosDB function. + /// + /// FunctionId of the monitored function. + /// Used for logging. + /// Monitored container for CosmosDB function. + /// Lease container for CosmosDB function. + /// Processor name used for function. public CosmosDBScaleMonitor(string functionId, ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName) { _logger = logger; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index 540533f78..cd6b780d2 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -9,6 +9,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { + /// + /// Target Scaler for a CosmosDB function. + /// public class CosmosDBTargetScaler : ITargetScaler { public const int DefaultMaxItemsPerInvocation = 100; @@ -19,7 +22,16 @@ public class CosmosDBTargetScaler : ITargetScaler private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute; private readonly Container _monitoredContainer; - internal CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) + /// + /// Instantiates a target based scaler. + /// + /// FunctionId of the monitored function. + /// Trigger attribute used for target based scaling. + /// Monitored container for CosmosDB function. + /// Lease container for CosmosDB function. + /// Processor name used for function. + /// Used for logging. + public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) { _functionId = functionId; _targetScalerDescriptor = new TargetScalerDescriptor(functionId); diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs index d45cbcb0e..9ddd56d08 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs @@ -5,6 +5,9 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { + /// + /// Metrics used to make scaling decisions for a CosmosDB function. + /// public class CosmosDBTriggerMetrics : ScaleMetrics { public int PartitionCount { get; set; } diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index 3f8ce73fd..7afa814fe 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -20,7 +20,7 @@ - + From 158bd9e769188fe72d5dddd2f77041064c909ac9 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Tue, 17 Jan 2023 15:22:04 -0500 Subject: [PATCH 25/28] make types internal --- .../Trigger/CosmosDBScaleMonitor.cs | 5 +---- .../Trigger/CosmosDBTargetScaler.cs | 14 +------------- .../Trigger/CosmosDBTriggerMetrics.cs | 5 +---- 3 files changed, 3 insertions(+), 21 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index dec9cd5a4..bb1096621 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -12,10 +12,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { - /// - /// Scale Monitor class for a CosmosDB function. - /// - public class CosmosDBScaleMonitor : IScaleMonitor + internal class CosmosDBScaleMonitor : IScaleMonitor { private readonly ILogger _logger; private readonly string _functionId; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs index cd6b780d2..d93cd86f1 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs @@ -9,10 +9,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger { - /// - /// Target Scaler for a CosmosDB function. - /// - public class CosmosDBTargetScaler : ITargetScaler + internal class CosmosDBTargetScaler : ITargetScaler { public const int DefaultMaxItemsPerInvocation = 100; private readonly string _functionId; @@ -22,15 +19,6 @@ public class CosmosDBTargetScaler : ITargetScaler private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute; private readonly Container _monitoredContainer; - /// - /// Instantiates a target based scaler. - /// - /// FunctionId of the monitored function. - /// Trigger attribute used for target based scaling. - /// Monitored container for CosmosDB function. - /// Lease container for CosmosDB function. - /// Processor name used for function. - /// Used for logging. public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) { _functionId = functionId; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs index 9ddd56d08..65c3c5d29 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerMetrics.cs @@ -5,10 +5,7 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { - /// - /// Metrics used to make scaling decisions for a CosmosDB function. - /// - public class CosmosDBTriggerMetrics : ScaleMetrics + internal class CosmosDBTriggerMetrics : ScaleMetrics { public int PartitionCount { get; set; } From 38fe7b5cde378e8b8217735cbd1b7ac4072e162b Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 18 Jan 2023 11:35:24 -0500 Subject: [PATCH 26/28] create cosmosdbscalerfactory --- .../Trigger/CosmosDBScaleMonitor.cs | 2 +- .../Trigger/CosmosDBScalerFactory.cs | 88 +++++++++++++++++++ .../Trigger/CosmosDBTriggerListener.cs | 2 +- 3 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs index bb1096621..d0a84bb29 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs @@ -28,7 +28,7 @@ internal class CosmosDBScaleMonitor : IScaleMonitor /// Monitored container for CosmosDB function. /// Lease container for CosmosDB function. /// Processor name used for function. - public CosmosDBScaleMonitor(string functionId, ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName) + public CosmosDBScaleMonitor(string functionId, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger) { _logger = logger; _functionId = functionId; diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs new file mode 100644 index 000000000..6f137e342 --- /dev/null +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs @@ -0,0 +1,88 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. +using System; +using System.Linq; +using Microsoft.Azure.Cosmos; +using Microsoft.Azure.WebJobs.Host.Scale; +using Microsoft.Extensions.Azure; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger +{ + internal class CosmosDBScalerFactory : IScalerFactory + { + private AzureComponentFactory _componentFactory; + + public CosmosDBScalerFactory(AzureComponentFactory componentFactory) + { + _componentFactory = componentFactory; + } + + public IScaleMonitor CreateScalerMonitor(ScalerContext context) + { + (string functionId, Container container, Container leaseContainer, CosmosDBTriggerAttribute attribute, string leaseContainerPrefix, ILoggerFactory loggerFactory) = CreateParameters(context); + ILogger logger = loggerFactory.CreateLogger(); + return new CosmosDBScaleMonitor(functionId, container, leaseContainer, leaseContainerPrefix, logger); + } + + public ITargetScaler CreateTargetScaler(ScalerContext context) + { + (string functionId, Container container, Container leaseContainer, CosmosDBTriggerAttribute attribute, string leaseContainerPrefix, ILoggerFactory loggerFactory) = CreateParameters(context); + ILogger logger = loggerFactory.CreateLogger(); + + return new CosmosDBTargetScaler(functionId, attribute, container, leaseContainer, leaseContainerPrefix, logger); + } + + private (string FunctionId, Container Container, Container LeaseContainer, CosmosDBTriggerAttribute CosmosDBTriggerAttribute, string LeaseContainerPrefix, ILoggerFactory LoggerFactory) CreateParameters(ScalerContext scalerContext) + { + DefaultCosmosDBServiceFactory serviceFactory = new DefaultCosmosDBServiceFactory(scalerContext.Configration, _componentFactory); + INameResolver resolver = new DefaultNameResolver(scalerContext.Configration); + + TriggerData[] allTriggers = JsonConvert.DeserializeObject(scalerContext.TriggerData); + TriggerData targetTrigger = allTriggers.SingleOrDefault(x => x.FunctionName == scalerContext.FunctionId); + + CosmosClientOptions options = new CosmosClientOptions(); + CosmosDBTriggerAttribute attribute = new CosmosDBTriggerAttribute(targetTrigger.DatabaseName, targetTrigger.ContainerName) { MaxItemsPerInvocation = targetTrigger.MaxItemsPerInvocation }; + + Container container = serviceFactory.CreateService(targetTrigger.Connection, options).GetContainer(targetTrigger.DatabaseName, targetTrigger.ContainerName); + Container leaseContainer = serviceFactory.CreateService(targetTrigger.LeaseConnection, options).GetContainer(targetTrigger.DatabaseName, targetTrigger.LeaseContainerName); + + string functionId = targetTrigger.FunctionName; + string leaseContainerPrefix = targetTrigger.LeaseContainerPrefix; + ILoggerFactory loggerFactory = scalerContext.LoggerFactory; + + return (FunctionId: functionId, Container: container, LeaseContainer: leaseContainer, CosmosDBTriggerAttribute: attribute, LeaseContainerPrefix: leaseContainerPrefix, LoggerFactory: loggerFactory); + } + + // Taken from: https://github.com/Azure/azure-webjobs-sdk-extensions/blob/dev/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerAttribute.cs + // Extracting the attributes necessary to make scale decisions. + [JsonObject] + private class TriggerData + { + [JsonProperty] + public string FunctionName { get; set; } + + [JsonProperty] + public string DatabaseName { get; set; } + + [JsonProperty] + public string Connection { get; set; } + + [JsonProperty] + public string ContainerName { get; set; } + + [JsonProperty] + public string LeaseConnection { get; set; } + + [JsonProperty] + public string LeaseContainerName { get; set; } + + [JsonProperty] + public int MaxItemsPerInvocation { get; set; } + + [JsonProperty] + public string LeaseContainerPrefix { get; set; } + } + } +} diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index 48ca72b22..d1e8924e4 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -60,7 +60,7 @@ public CosmosDBTriggerListener( this._listenerLogDetails = $"prefix='{this._processorName}', monitoredContainer='{this._monitoredContainer.Id}', monitoredDatabase='{this._monitoredContainer.Database.Id}', " + $"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'"; - this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, logger, _monitoredContainer, _leaseContainer, _processorName); + this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, _monitoredContainer, _leaseContainer, _processorName, _logger); this._cosmosDBTargetScaler = new CosmosDBTargetScaler(_functionId, _cosmosDBAttribute, _monitoredContainer, _leaseContainer, _processorName, _logger); } From cf3c343d86f6e5b20d055aa86aac1715c3f3a4e2 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Wed, 18 Jan 2023 11:36:25 -0500 Subject: [PATCH 27/28] move ilogger to back of scalemonitor constructor --- .../Trigger/CosmosDBScalerFactory.cs | 1 - .../Trigger/CosmosDBScaleMonitorTests.cs | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs index 6f137e342..70caf4ecc 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs @@ -1,6 +1,5 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using System; using System.Linq; using Microsoft.Azure.Cosmos; using Microsoft.Azure.WebJobs.Host.Scale; diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs index fa436bc8f..7a50f49e2 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs @@ -69,7 +69,7 @@ public CosmosDBScaleMonitorTests() _logDetails = $"prefix='{ProcessorName}', monitoredContainer='{ContainerName}', monitoredDatabase='{DatabaseName}', " + $"leaseContainer='{ContainerName}', leaseDatabase='{DatabaseName}', functionId='{this._functionId}'"; - _scaleMonitor = new CosmosDBScaleMonitor(_functionId, _loggerFactory.CreateLogger(), _monitoredContainer.Object, _leasesContainer.Object, ProcessorName); + _scaleMonitor = new CosmosDBScaleMonitor(_functionId, _monitoredContainer.Object, _leasesContainer.Object, ProcessorName, _loggerFactory.CreateLogger()); } [Fact] From 87dc56e634080c6f9f1e9ba29dba80299b07df85 Mon Sep 17 00:00:00 2001 From: Vincent Chiang Date: Thu, 19 Jan 2023 13:17:26 -0500 Subject: [PATCH 28/28] add method to validate and process trigger data --- .../Trigger/CosmosDBScalerFactory.cs | 33 ++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs index 70caf4ecc..5ffb5821a 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScalerFactory.cs @@ -1,5 +1,6 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; using System.Linq; using Microsoft.Azure.Cosmos; using Microsoft.Azure.WebJobs.Host.Scale; @@ -40,6 +41,7 @@ public ITargetScaler CreateTargetScaler(ScalerContext context) TriggerData[] allTriggers = JsonConvert.DeserializeObject(scalerContext.TriggerData); TriggerData targetTrigger = allTriggers.SingleOrDefault(x => x.FunctionName == scalerContext.FunctionId); + targetTrigger = ValidateAndProcessTriggerData(targetTrigger); CosmosClientOptions options = new CosmosClientOptions(); CosmosDBTriggerAttribute attribute = new CosmosDBTriggerAttribute(targetTrigger.DatabaseName, targetTrigger.ContainerName) { MaxItemsPerInvocation = targetTrigger.MaxItemsPerInvocation }; @@ -54,8 +56,37 @@ public ITargetScaler CreateTargetScaler(ScalerContext context) return (FunctionId: functionId, Container: container, LeaseContainer: leaseContainer, CosmosDBTriggerAttribute: attribute, LeaseContainerPrefix: leaseContainerPrefix, LoggerFactory: loggerFactory); } + private TriggerData ValidateAndProcessTriggerData(TriggerData triggerData) + { + // Validate required arguments + if (triggerData == null) + { + throw new ArgumentNullException(nameof(triggerData)); + } + else if (string.IsNullOrEmpty(triggerData.Connection)) + { + throw new ArgumentNullException(nameof(triggerData.Connection)); + } + else if (string.IsNullOrEmpty(triggerData.DatabaseName)) + { + throw new ArgumentNullException(nameof(triggerData.DatabaseName)); + } + else if (string.IsNullOrEmpty(triggerData.ContainerName)) + { + throw new ArgumentNullException(nameof(triggerData.ContainerName)); + } + + // Populate optional arguments + triggerData.LeaseConnection = triggerData.LeaseConnection ?? triggerData.Connection; + triggerData.LeaseContainerName = triggerData.LeaseContainerName ?? "leases"; + triggerData.LeaseContainerPrefix = triggerData.LeaseContainerPrefix ?? string.Empty; + + return triggerData; + } + // Taken from: https://github.com/Azure/azure-webjobs-sdk-extensions/blob/dev/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerAttribute.cs - // Extracting the attributes necessary to make scale decisions. + // Extracting only the attributes necessary to make scale decisions. + // Assumes the attributes have already been resolved if defined as an environment variable [JsonObject] private class TriggerData {