diff --git a/appveyor.yml b/appveyor.yml
index 4a1437273..1fc0a6460 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -18,7 +18,7 @@ install:
- ps: >
$env:CommitHash = "$env:APPVEYOR_REPO_COMMIT"
- .\dotnet-install.ps1 -Version 2.1.300 -Architecture x86
+ .\dotnet-install.ps1 -Version 3.1.300 -Architecture x86
build_script:
- ps: |
$buildNumber = 0
diff --git a/src/ExtensionsSample/ExtensionsSample.csproj b/src/ExtensionsSample/ExtensionsSample.csproj
index b1831bdc5..3cb94854b 100644
--- a/src/ExtensionsSample/ExtensionsSample.csproj
+++ b/src/ExtensionsSample/ExtensionsSample.csproj
@@ -2,7 +2,7 @@
Exe
- netcoreapp2.1
+ netcoreapp3.1
latest
$(NoWarn);8002
diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs
new file mode 100644
index 000000000..d1d58e7d0
--- /dev/null
+++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs
@@ -0,0 +1,134 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Threading.Tasks;
+using Microsoft.Azure.Cosmos;
+using Microsoft.Extensions.Logging;
+
+namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger
+{
+ internal class CosmosDBMetricsProvider
+ {
+ private readonly ILogger _logger;
+ private readonly Container _monitoredContainer;
+ private readonly Container _leaseContainer;
+ private readonly string _processorName;
+
+ private static readonly Dictionary KnownDocumentClientErrors = new Dictionary()
+ {
+ { "Resource Not Found", "Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files." },
+ { "The input authorization token can't serve the request", string.Empty },
+ { "The MAC signature found in the HTTP request is not the same", string.Empty },
+ { "Service is currently unavailable.", string.Empty },
+ { "Entity with the specified id does not exist in the system.", string.Empty },
+ { "Subscription owning the database account is disabled.", string.Empty },
+ { "Request rate is large", string.Empty },
+ { "The remote name could not be resolved:", string.Empty },
+ { "Owner resource does not exist", string.Empty },
+ { "The specified document collection is invalid", string.Empty }
+ };
+
+ public CosmosDBMetricsProvider(ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName)
+ {
+ _logger = logger;
+ _monitoredContainer = monitoredContainer;
+ _leaseContainer = leaseContainer;
+ _processorName = processorName;
+ }
+
+ public async Task GetMetricsAsync()
+ {
+ int partitionCount = 0;
+ long remainingWork = 0;
+
+ try
+ {
+ List partitionWorkList = new List();
+ ChangeFeedEstimator estimator = _monitoredContainer.GetChangeFeedEstimator(_processorName, _leaseContainer);
+ using (FeedIterator iterator = estimator.GetCurrentStateIterator())
+ {
+ while (iterator.HasMoreResults)
+ {
+ FeedResponse response = await iterator.ReadNextAsync();
+ partitionWorkList.AddRange(response);
+ }
+ }
+
+ partitionCount = partitionWorkList.Count;
+ remainingWork = partitionWorkList.Sum(item => item.EstimatedLag);
+ }
+ catch (Exception e) when (e is CosmosException || e is InvalidOperationException)
+ {
+ if (!TryHandleCosmosException(e))
+ {
+ _logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message);
+ if (e is InvalidOperationException)
+ {
+ throw;
+ }
+ }
+ }
+ catch (System.Net.Http.HttpRequestException e)
+ {
+ string errormsg;
+
+ var webException = e.InnerException as WebException;
+ if (webException != null &&
+ webException.Status == WebExceptionStatus.ProtocolError)
+ {
+ string statusCode = ((HttpWebResponse)webException.Response).StatusCode.ToString();
+ string statusDesc = ((HttpWebResponse)webException.Response).StatusDescription;
+ errormsg = string.Format("CosmosDBTrigger status {0}: {1}.", statusCode, statusDesc);
+ }
+ else if (webException != null &&
+ webException.Status == WebExceptionStatus.NameResolutionFailure)
+ {
+ errormsg = string.Format("CosmosDBTrigger Exception message: {0}.", webException.Message);
+ }
+ else
+ {
+ errormsg = e.ToString();
+ }
+
+ _logger.LogWarning(Events.OnScaling, errormsg);
+ }
+
+ return new CosmosDBTriggerMetrics
+ {
+ Timestamp = DateTime.UtcNow,
+ PartitionCount = partitionCount,
+ RemainingWork = remainingWork
+ };
+ }
+
+ // Since all exceptions in the Cosmos client are thrown as CosmosExceptions, we have to parse their error strings because we dont have access to the internal types
+ private bool TryHandleCosmosException(Exception exception)
+ {
+ string errormsg = null;
+ string exceptionMessage = exception.Message;
+
+ if (!string.IsNullOrEmpty(exceptionMessage))
+ {
+ foreach (KeyValuePair exceptionString in KnownDocumentClientErrors)
+ {
+ if (exceptionMessage.IndexOf(exceptionString.Key, StringComparison.OrdinalIgnoreCase) >= 0)
+ {
+ errormsg = !string.IsNullOrEmpty(exceptionString.Value) ? exceptionString.Value : exceptionMessage;
+ }
+ }
+ }
+
+ if (!string.IsNullOrEmpty(errormsg))
+ {
+ _logger.LogWarning(errormsg);
+ return true;
+ }
+
+ return false;
+ }
+ }
+}
diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs
new file mode 100644
index 000000000..bb1096621
--- /dev/null
+++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs
@@ -0,0 +1,174 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Threading.Tasks;
+using Microsoft.Azure.Cosmos;
+using Microsoft.Azure.WebJobs.Host.Scale;
+using Microsoft.Extensions.Logging;
+
+namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger
+{
+ internal class CosmosDBScaleMonitor : IScaleMonitor
+ {
+ private readonly ILogger _logger;
+ private readonly string _functionId;
+ private readonly Container _monitoredContainer;
+ private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor;
+ private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider;
+
+ ///
+ /// Instantiates a scale monitor for CosmosDB function.
+ ///
+ /// FunctionId of the monitored function.
+ /// Used for logging.
+ /// Monitored container for CosmosDB function.
+ /// Lease container for CosmosDB function.
+ /// Processor name used for function.
+ public CosmosDBScaleMonitor(string functionId, ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName)
+ {
+ _logger = logger;
+ _functionId = functionId;
+ _monitoredContainer = monitoredContainer;
+ _scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower(), _functionId);
+ _cosmosDBMetricsProvider = new CosmosDBMetricsProvider(logger, monitoredContainer, leaseContainer, processorName);
+ }
+
+ public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor;
+
+ public ScaleStatus GetScaleStatus(ScaleStatusContext context)
+ {
+ return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
+ }
+
+ async Task IScaleMonitor.GetMetricsAsync()
+ {
+ return await GetMetricsAsync().ConfigureAwait(false);
+ }
+
+ public async Task GetMetricsAsync()
+ {
+ return await _cosmosDBMetricsProvider.GetMetricsAsync().ConfigureAwait(false);
+ }
+
+ public ScaleStatus GetScaleStatus(ScaleStatusContext context)
+ {
+ context.Metrics?.Cast().ToArray();
+
+ return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray());
+ }
+
+ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] metrics)
+ {
+ ScaleStatus status = new ScaleStatus
+ {
+ Vote = ScaleVote.None
+ };
+
+ const int NumberOfSamplesToConsider = 5;
+
+ // Unable to determine the correct vote with no metrics.
+ if (metrics == null)
+ {
+ return status;
+ }
+
+ // We shouldn't assign more workers than there are partitions (Cosmos DB, Event Hub, Service Bus Queue/Topic)
+ // This check is first, because it is independent of load or number of samples.
+ int partitionCount = metrics.Length > 0 ? metrics.Last().PartitionCount : 0;
+ if (partitionCount > 0 && partitionCount < workerCount)
+ {
+ status.Vote = ScaleVote.ScaleIn;
+ _logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."));
+ _logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " +
+ $"of partitions for container ({_monitoredContainer.Id}, {partitionCount})."));
+ return status;
+ }
+
+ // At least 5 samples are required to make a scale decision for the rest of the checks.
+ if (metrics.Length < NumberOfSamplesToConsider)
+ {
+ return status;
+ }
+
+ // Maintain a minimum ratio of 1 worker per 1,000 items of remaining work.
+ long latestRemainingWork = metrics.Last().RemainingWork;
+ if (latestRemainingWork > workerCount * 1000)
+ {
+ status.Vote = ScaleVote.ScaleOut;
+ _logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000."));
+ _logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({_monitoredContainer.Id}, {latestRemainingWork}) " +
+ $"is too high relative to the number of instances ({workerCount})."));
+ return status;
+ }
+
+ bool documentsWaiting = metrics.All(m => m.RemainingWork > 0);
+ if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount)
+ {
+ status.Vote = ScaleVote.ScaleOut;
+ _logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{_monitoredContainer.Id}' has documents waiting to be processed."));
+ _logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions."));
+ return status;
+ }
+
+ // Check to see if the trigger source has been empty for a while. Only if all trigger sources are empty do we scale down.
+ bool isIdle = metrics.All(m => m.RemainingWork == 0);
+ if (isIdle)
+ {
+ status.Vote = ScaleVote.ScaleIn;
+ _logger.LogInformation(Events.OnScaling, string.Format($"'{_monitoredContainer.Id}' is idle."));
+ return status;
+ }
+
+ // Samples are in chronological order. Check for a continuous increase in work remaining.
+ // If detected, this results in an automatic scale out for the site container.
+ bool remainingWorkIncreasing =
+ IsTrueForLast(
+ metrics,
+ NumberOfSamplesToConsider,
+ (prev, next) => prev.RemainingWork < next.RemainingWork) && metrics[0].RemainingWork > 0;
+ if (remainingWorkIncreasing)
+ {
+ status.Vote = ScaleVote.ScaleOut;
+ _logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{_monitoredContainer.Id}'.");
+ return status;
+ }
+
+ bool remainingWorkDecreasing =
+ IsTrueForLast(
+ metrics,
+ NumberOfSamplesToConsider,
+ (prev, next) => prev.RemainingWork > next.RemainingWork);
+ if (remainingWorkDecreasing)
+ {
+ status.Vote = ScaleVote.ScaleIn;
+ _logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{_monitoredContainer.Id}'.");
+ return status;
+ }
+
+ _logger.LogInformation(Events.OnScaling, $"CosmosDB container '{_monitoredContainer.Id}' is steady.");
+
+ return status;
+ }
+
+ private static bool IsTrueForLast(IList metrics, int count, Func predicate)
+ {
+ Debug.Assert(count > 1, "count must be greater than 1.");
+ Debug.Assert(count <= metrics.Count, "count must be less than or equal to the list size.");
+
+ // Walks through the list from left to right starting at len(samples) - count.
+ for (int i = metrics.Count - count; i < metrics.Count - 1; i++)
+ {
+ if (!predicate(metrics[i], metrics[i + 1]))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+ }
+}
diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs
new file mode 100644
index 000000000..d93cd86f1
--- /dev/null
+++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs
@@ -0,0 +1,78 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Threading.Tasks;
+using Microsoft.Azure.Cosmos;
+using Microsoft.Azure.WebJobs.Host.Scale;
+using Microsoft.Extensions.Logging;
+
+namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger
+{
+ internal class CosmosDBTargetScaler : ITargetScaler
+ {
+ public const int DefaultMaxItemsPerInvocation = 100;
+ private readonly string _functionId;
+ private readonly TargetScalerDescriptor _targetScalerDescriptor;
+ private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider;
+ private readonly ILogger _logger;
+ private readonly CosmosDBTriggerAttribute _cosmosDBTriggerAttribute;
+ private readonly Container _monitoredContainer;
+
+ public CosmosDBTargetScaler(string functionId, CosmosDBTriggerAttribute cosmosDBTriggerAttribute, Container monitoredContainer, Container leaseContainer, string processorName, ILogger logger)
+ {
+ _functionId = functionId;
+ _targetScalerDescriptor = new TargetScalerDescriptor(functionId);
+ _monitoredContainer = monitoredContainer;
+ _cosmosDBMetricsProvider = new CosmosDBMetricsProvider(logger, _monitoredContainer, leaseContainer, processorName);
+ _logger = logger;
+ _cosmosDBTriggerAttribute = cosmosDBTriggerAttribute;
+ }
+
+ public TargetScalerDescriptor TargetScalerDescriptor => _targetScalerDescriptor;
+
+ public async Task GetScaleResultAsync(TargetScalerContext context)
+ {
+ CosmosDBTriggerMetrics metrics = await _cosmosDBMetricsProvider.GetMetricsAsync();
+
+ return GetScaleResultInternal(context, metrics.RemainingWork, metrics.PartitionCount);
+ }
+
+ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context, long remainingWork, int partitionCount)
+ {
+ int concurrency;
+
+ if (!context.InstanceConcurrency.HasValue)
+ {
+ concurrency = _cosmosDBTriggerAttribute.MaxItemsPerInvocation > 0 ? _cosmosDBTriggerAttribute.MaxItemsPerInvocation : DefaultMaxItemsPerInvocation;
+ }
+ else
+ {
+ concurrency = context.InstanceConcurrency.Value;
+ }
+
+ if (concurrency <= 0)
+ {
+ _logger.LogWarning($"Concurrency value for target based scale must be greater than 0. Using default value of {DefaultMaxItemsPerInvocation} as concurrency value.");
+ concurrency = DefaultMaxItemsPerInvocation;
+ }
+
+ int targetWorkerCount = (int)Math.Ceiling(remainingWork / (decimal)concurrency);
+
+ string targetScaleMessage = $"Target worker count for function '{_functionId}' is '{targetWorkerCount}' (MonitoredContainerId='{_monitoredContainer.Id}', MonitoredContainerDatabaseId='{_monitoredContainer.Database.Id}', RemainingWork ='{remainingWork}', Concurrency='{concurrency}').";
+
+ if (partitionCount > 0 && targetWorkerCount > partitionCount)
+ {
+ targetScaleMessage += $" However, partition count is {partitionCount}. Adding more workers than partitions would not be helpful, so capping target worker count at {partitionCount}";
+ targetWorkerCount = partitionCount;
+ }
+
+ _logger.LogInformation(targetScaleMessage);
+
+ return new TargetScalerResult
+ {
+ TargetWorkerCount = targetWorkerCount
+ };
+ }
+ }
+}
diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs
index 12dab1e97..48ca72b22 100644
--- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs
+++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs
@@ -3,12 +3,11 @@
using System;
using System.Collections.Generic;
-using System.Diagnostics;
-using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
+using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -16,7 +15,7 @@
namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB
{
- internal class CosmosDBTriggerListener : IListener, IScaleMonitor
+ internal class CosmosDBTriggerListener : IListener, IScaleMonitorProvider, ITargetScalerProvider
{
private const int ListenerNotRegistered = 0;
private const int ListenerRegistering = 1;
@@ -33,25 +32,12 @@ internal class CosmosDBTriggerListener : IListener, IScaleMonitor _cosmosDBScaleMonitor;
+ private readonly ITargetScaler _cosmosDBTargetScaler;
private ChangeFeedProcessor _host;
private ChangeFeedProcessorBuilder _hostBuilder;
private int _listenerStatus;
- private static readonly Dictionary KnownDocumentClientErrors = new Dictionary()
- {
- { "Resource Not Found", "Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files." },
- { "The input authorization token can't serve the request", string.Empty },
- { "The MAC signature found in the HTTP request is not the same", string.Empty },
- { "Service is currently unavailable.", string.Empty },
- { "Entity with the specified id does not exist in the system.", string.Empty },
- { "Subscription owning the database account is disabled.", string.Empty },
- { "Request rate is large", string.Empty },
- { "PartitionKey value must be supplied for this operation.", "We do not support lease containers with partitions at this time. Please create a new lease collection without partitions." },
- { "The remote name could not be resolved:", string.Empty },
- { "Owner resource does not exist", string.Empty },
- { "The specified document collection is invalid", string.Empty }
- };
-
public CosmosDBTriggerListener(
ITriggeredFunctionExecutor executor,
string functionId,
@@ -73,6 +59,9 @@ public CosmosDBTriggerListener(
this._healthMonitor = new CosmosDBTriggerHealthMonitor(logger);
this._listenerLogDetails = $"prefix='{this._processorName}', monitoredContainer='{this._monitoredContainer.Id}', monitoredDatabase='{this._monitoredContainer.Database.Id}', " +
$"leaseContainer='{this._leaseContainer.Id}', leaseDatabase='{this._leaseContainer.Database.Id}', functionId='{this._functionId}'";
+
+ this._cosmosDBScaleMonitor = new CosmosDBScaleMonitor(_functionId, logger, _monitoredContainer, _leaseContainer, _processorName);
+ this._cosmosDBTargetScaler = new CosmosDBTargetScaler(_functionId, _cosmosDBAttribute, _monitoredContainer, _leaseContainer, _processorName, _logger);
}
public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor;
@@ -232,220 +221,14 @@ private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IRead
}
}
- public async Task GetMetricsAsync()
- {
- int partitionCount = 0;
- long remainingWork = 0;
-
- try
- {
- List partitionWorkList = new List();
- ChangeFeedEstimator estimator = this._monitoredContainer.GetChangeFeedEstimator(this._processorName, this._leaseContainer);
- using (FeedIterator iterator = estimator.GetCurrentStateIterator())
- {
- while (iterator.HasMoreResults)
- {
- FeedResponse response = await iterator.ReadNextAsync();
- partitionWorkList.AddRange(response);
- }
- }
-
- partitionCount = partitionWorkList.Count;
- remainingWork = partitionWorkList.Sum(item => item.EstimatedLag);
- }
- catch (Exception e) when (e is CosmosException || e is InvalidOperationException)
- {
- if (!TryHandleCosmosException(e))
- {
- _logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message);
- if (e is InvalidOperationException)
- {
- throw;
- }
- }
- }
- catch (System.Net.Http.HttpRequestException e)
- {
- string errormsg;
-
- var webException = e.InnerException as WebException;
- if (webException != null &&
- webException.Status == WebExceptionStatus.ProtocolError)
- {
- string statusCode = ((HttpWebResponse)webException.Response).StatusCode.ToString();
- string statusDesc = ((HttpWebResponse)webException.Response).StatusDescription;
- errormsg = string.Format("CosmosDBTrigger status {0}: {1}.", statusCode, statusDesc);
- }
- else if (webException != null &&
- webException.Status == WebExceptionStatus.NameResolutionFailure)
- {
- errormsg = string.Format("CosmosDBTrigger Exception message: {0}.", webException.Message);
- }
- else
- {
- errormsg = e.ToString();
- }
-
- _logger.LogWarning(Events.OnScaling, errormsg);
- }
-
- return new CosmosDBTriggerMetrics
- {
- Timestamp = DateTime.UtcNow,
- PartitionCount = partitionCount,
- RemainingWork = remainingWork
- };
- }
-
- public ScaleStatus GetScaleStatus(ScaleStatusContext context)
+ public IScaleMonitor GetMonitor()
{
- return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray());
+ return _cosmosDBScaleMonitor;
}
- async Task IScaleMonitor.GetMetricsAsync()
+ public ITargetScaler GetTargetScaler()
{
- return await GetMetricsAsync();
- }
-
- public ScaleStatus GetScaleStatus(ScaleStatusContext context)
- {
- return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast().ToArray());
- }
-
- private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] metrics)
- {
- ScaleStatus status = new ScaleStatus
- {
- Vote = ScaleVote.None
- };
-
- const int NumberOfSamplesToConsider = 5;
-
- // Unable to determine the correct vote with no metrics.
- if (metrics == null)
- {
- return status;
- }
-
- // We shouldn't assign more workers than there are partitions (Cosmos DB, Event Hub, Service Bus Queue/Topic)
- // This check is first, because it is independent of load or number of samples.
- int partitionCount = metrics.Length > 0 ? metrics.Last().PartitionCount : 0;
- if (partitionCount > 0 && partitionCount < workerCount)
- {
- status.Vote = ScaleVote.ScaleIn;
- _logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."));
- _logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " +
- $"of partitions for container ({this._monitoredContainer.Id}, {partitionCount})."));
- return status;
- }
-
- // At least 5 samples are required to make a scale decision for the rest of the checks.
- if (metrics.Length < NumberOfSamplesToConsider)
- {
- return status;
- }
-
- // Maintain a minimum ratio of 1 worker per 1,000 items of remaining work.
- long latestRemainingWork = metrics.Last().RemainingWork;
- if (latestRemainingWork > workerCount * 1000)
- {
- status.Vote = ScaleVote.ScaleOut;
- _logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000."));
- _logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " +
- $"is too high relative to the number of instances ({workerCount})."));
- return status;
- }
-
- bool documentsWaiting = metrics.All(m => m.RemainingWork > 0);
- if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount)
- {
- status.Vote = ScaleVote.ScaleOut;
- _logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed."));
- _logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions."));
- return status;
- }
-
- // Check to see if the trigger source has been empty for a while. Only if all trigger sources are empty do we scale down.
- bool isIdle = metrics.All(m => m.RemainingWork == 0);
- if (isIdle)
- {
- status.Vote = ScaleVote.ScaleIn;
- _logger.LogInformation(Events.OnScaling, string.Format($"'{this._monitoredContainer.Id}' is idle."));
- return status;
- }
-
- // Samples are in chronological order. Check for a continuous increase in work remaining.
- // If detected, this results in an automatic scale out for the site container.
- bool remainingWorkIncreasing =
- IsTrueForLast(
- metrics,
- NumberOfSamplesToConsider,
- (prev, next) => prev.RemainingWork < next.RemainingWork) && metrics[0].RemainingWork > 0;
- if (remainingWorkIncreasing)
- {
- status.Vote = ScaleVote.ScaleOut;
- _logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{this._monitoredContainer.Id}'.");
- return status;
- }
-
- bool remainingWorkDecreasing =
- IsTrueForLast(
- metrics,
- NumberOfSamplesToConsider,
- (prev, next) => prev.RemainingWork > next.RemainingWork);
- if (remainingWorkDecreasing)
- {
- status.Vote = ScaleVote.ScaleIn;
- _logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{this._monitoredContainer.Id}'.");
- return status;
- }
-
- _logger.LogInformation(Events.OnScaling, $"CosmosDB container '{this._monitoredContainer.Id}' is steady.");
-
- return status;
- }
-
- // Since all exceptions in the Cosmos client are thrown as CosmosExceptions, we have to parse their error strings because we dont have access to the internal types
- private bool TryHandleCosmosException(Exception exception)
- {
- string errormsg = null;
- string exceptionMessage = exception.Message;
-
- if (!string.IsNullOrEmpty(exceptionMessage))
- {
- foreach (KeyValuePair exceptionString in KnownDocumentClientErrors)
- {
- if (exceptionMessage.IndexOf(exceptionString.Key, StringComparison.OrdinalIgnoreCase) >= 0)
- {
- errormsg = !string.IsNullOrEmpty(exceptionString.Value) ? exceptionString.Value : exceptionMessage;
- }
- }
- }
-
- if (!string.IsNullOrEmpty(errormsg))
- {
- _logger.LogWarning(errormsg);
- return true;
- }
-
- return false;
- }
-
- private static bool IsTrueForLast(IList metrics, int count, Func predicate)
- {
- Debug.Assert(count > 1, "count must be greater than 1.");
- Debug.Assert(count <= metrics.Count, "count must be less than or equal to the list size.");
-
- // Walks through the list from left to right starting at len(samples) - count.
- for (int i = metrics.Count - count; i < metrics.Count - 1; i++)
- {
- if (!predicate(metrics[i], metrics[i + 1]))
- {
- return false;
- }
- }
-
- return true;
+ return _cosmosDBTargetScaler;
}
}
}
diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj
index 8a3067b33..3f8ce73fd 100644
--- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj
+++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj
@@ -20,7 +20,7 @@
-
+
diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs
index bc5cf0cfa..326db586d 100644
--- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs
+++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBListenerTests.cs
@@ -9,6 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
+using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger;
using Microsoft.Azure.WebJobs.Extensions.Tests.Common;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Scale;
@@ -72,277 +73,6 @@ public CosmosDBListenerTests()
$"leaseContainer='{ContainerName}', leaseDatabase='{DatabaseName}', functionId='{this._functionId}'";
}
- [Fact]
- public void ScaleMonitorDescriptor_ReturnsExpectedValue()
- {
- Assert.Equal($"{_functionId}-cosmosdbtrigger-{DatabaseName}-{ContainerName}".ToLower(), _listener.Descriptor.Id);
- }
-
- [Fact]
- public async Task GetMetrics_ReturnsExpectedResult()
- {
- _estimatorIterator
- .SetupSequence(m => m.HasMoreResults)
- .Returns(true)
- .Returns(false);
-
- Mock> response = new Mock>();
- response
- .Setup(m => m.GetEnumerator())
- .Returns(new List().GetEnumerator());
-
- _estimatorIterator
- .Setup(m => m.ReadNextAsync(It.IsAny()))
- .Returns(Task.FromResult(response.Object));
-
- var metrics = await _listener.GetMetricsAsync();
-
- Assert.Equal(0, metrics.PartitionCount);
- Assert.Equal(0, metrics.RemainingWork);
- Assert.NotEqual(default(DateTime), metrics.Timestamp);
-
- _estimatorIterator
- .SetupSequence(m => m.HasMoreResults)
- .Returns(true)
- .Returns(false);
-
- response
- .Setup(m => m.GetEnumerator())
- .Returns(new List()
- {
- new ChangeFeedProcessorState("a", 5, string.Empty),
- new ChangeFeedProcessorState("b", 5, string.Empty),
- new ChangeFeedProcessorState("c", 5, string.Empty),
- new ChangeFeedProcessorState("d", 5, string.Empty)
- }.GetEnumerator());
-
- metrics = await _listener.GetMetricsAsync();
-
- Assert.Equal(4, metrics.PartitionCount);
- Assert.Equal(20, metrics.RemainingWork);
- Assert.NotEqual(default(DateTime), metrics.Timestamp);
-
- _estimatorIterator
- .SetupSequence(m => m.HasMoreResults)
- .Returns(true)
- .Returns(false);
-
- response
- .Setup(m => m.GetEnumerator())
- .Returns(new List()
- {
- new ChangeFeedProcessorState("a", 5, string.Empty),
- new ChangeFeedProcessorState("b", 5, string.Empty),
- new ChangeFeedProcessorState("c", 5, string.Empty),
- new ChangeFeedProcessorState("d", 5, string.Empty)
- }.GetEnumerator());
-
- // verify non-generic interface works as expected
- metrics = (CosmosDBTriggerMetrics)(await ((IScaleMonitor)_listener).GetMetricsAsync());
- Assert.Equal(4, metrics.PartitionCount);
- Assert.Equal(20, metrics.RemainingWork);
- Assert.NotEqual(default(DateTime), metrics.Timestamp);
- }
-
- [Fact]
- public async Task GetMetrics_HandlesExceptions()
- {
- // Can't test DocumentClientExceptions because they can't be constructed.
- _estimatorIterator
- .Setup(m => m.HasMoreResults).Returns(true);
-
- _estimatorIterator
- .SetupSequence(m => m.ReadNextAsync(It.IsAny()))
- .ThrowsAsync(new CosmosException("Resource not found", HttpStatusCode.NotFound, 0, string.Empty, 0))
- .ThrowsAsync(new InvalidOperationException("Unknown"))
- .ThrowsAsync(new HttpRequestException("Uh oh", new System.Net.WebException("Uh oh again", WebExceptionStatus.NameResolutionFailure)));
-
- var metrics = await _listener.GetMetricsAsync();
-
- Assert.Equal(0, metrics.PartitionCount);
- Assert.Equal(0, metrics.RemainingWork);
- Assert.NotEqual(default(DateTime), metrics.Timestamp);
-
- var warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning);
- Assert.Equal("Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files.", warning.FormattedMessage);
- _loggerProvider.ClearAllLogMessages();
-
- await Assert.ThrowsAsync(async () => await _listener.GetMetricsAsync());
-
- warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning);
- Assert.Equal("Unable to handle System.InvalidOperationException: Unknown", warning.FormattedMessage);
- _loggerProvider.ClearAllLogMessages();
-
- metrics = await _listener.GetMetricsAsync();
-
- Assert.Equal(0, metrics.PartitionCount);
- Assert.Equal(0, metrics.RemainingWork);
- Assert.NotEqual(default(DateTime), metrics.Timestamp);
-
- warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning);
- Assert.Equal("CosmosDBTrigger Exception message: Uh oh again.", warning.FormattedMessage);
- }
-
- [Fact]
- public void GetScaleStatus_NoMetrics_ReturnsVote_None()
- {
- var context = new ScaleStatusContext
- {
- WorkerCount = 1
- };
-
- var status = _listener.GetScaleStatus(context);
- Assert.Equal(ScaleVote.None, status.Vote);
-
- // verify the non-generic implementation works properly
- status = ((IScaleMonitor)_listener).GetScaleStatus(context);
- Assert.Equal(ScaleVote.None, status.Vote);
- }
-
- [Fact]
- public void GetScaleStatus_InstancesPerPartitionThresholdExceeded_ReturnsVote_ScaleIn()
- {
- var context = new ScaleStatusContext
- {
- WorkerCount = 2
- };
- var timestamp = DateTime.UtcNow;
- context.Metrics = new List
- {
- new CosmosDBTriggerMetrics { RemainingWork = 2500, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2505, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2612, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2700, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2810, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
- };
-
- var status = _listener.GetScaleStatus(context);
- Assert.Equal(ScaleVote.ScaleIn, status.Vote);
-
- var logs = _loggerProvider.GetAllLogMessages().ToArray();
- var log = logs[0];
- Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
- Assert.Equal("WorkerCount (2) > PartitionCount (1).", log.FormattedMessage);
- log = logs[1];
- Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
- Assert.Equal($"Number of instances (2) is too high relative to number of partitions for container ({ContainerName}, 1).", log.FormattedMessage);
- }
-
- [Fact]
- public void GetScaleStatus_MessagesPerWorkerThresholdExceeded_ReturnsVote_ScaleOut()
- {
- var context = new ScaleStatusContext
- {
- WorkerCount = 1
- };
- var timestamp = DateTime.UtcNow;
- context.Metrics = new List
- {
- new CosmosDBTriggerMetrics { RemainingWork = 2500, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2505, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2612, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2700, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2810, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- };
-
- var status = _listener.GetScaleStatus(context);
- Assert.Equal(ScaleVote.ScaleOut, status.Vote);
-
- var logs = _loggerProvider.GetAllLogMessages().ToArray();
- var log = logs[0];
- Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
- Assert.Equal("RemainingWork (2900) > WorkerCount (1) * 1,000.", log.FormattedMessage);
- log = logs[1];
- Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
- Assert.Equal($"Remaining work for container ({ContainerName}, 2900) is too high relative to the number of instances (1).", log.FormattedMessage);
- }
-
- [Fact]
- public void GetScaleStatus_ConsistentRemainingWork_ReturnsVote_ScaleOut()
- {
- var context = new ScaleStatusContext
- {
- WorkerCount = 1
- };
- var timestamp = DateTime.UtcNow;
- context.Metrics = new List
- {
- new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
- };
-
- var status = _listener.GetScaleStatus(context);
- Assert.Equal(ScaleVote.ScaleOut, status.Vote);
-
- var logs = _loggerProvider.GetAllLogMessages().ToArray();
- var log = logs[0];
- Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
- Assert.Equal($"CosmosDB container '{ContainerName}' has documents waiting to be processed.", log.FormattedMessage);
- log = logs[1];
- Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
- Assert.Equal("There are 1 instances relative to 2 partitions.", log.FormattedMessage);
- }
-
- [Fact]
- public void GetScaleStatus_RemainingWorkIncreasing_ReturnsVote_ScaleOut()
- {
- var context = new ScaleStatusContext
- {
- WorkerCount = 1
- };
- var timestamp = DateTime.UtcNow;
- context.Metrics = new List
- {
- new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 20, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 40, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 80, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 100, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- };
-
- var status = _listener.GetScaleStatus(context);
- Assert.Equal(ScaleVote.ScaleOut, status.Vote);
-
- var logs = _loggerProvider.GetAllLogMessages().ToArray();
- var log = logs[0];
- Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
- Assert.Equal($"Remaining work is increasing for '{ContainerName}'.", log.FormattedMessage);
- }
-
- [Fact]
- public void GetScaleStatus_RemainingWorkDecreasing_ReturnsVote_ScaleIn()
- {
- var context = new ScaleStatusContext
- {
- WorkerCount = 1
- };
- var timestamp = DateTime.UtcNow;
- context.Metrics = new List
- {
- new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 100, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 80, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 40, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 20, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
- };
-
- var status = _listener.GetScaleStatus(context);
- Assert.Equal(ScaleVote.ScaleIn, status.Vote);
-
- var logs = _loggerProvider.GetAllLogMessages().ToArray();
- var log = logs[0];
- Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
- Assert.Equal($"Remaining work is decreasing for '{ContainerName}'.", log.FormattedMessage);
- }
-
[Fact]
public async Task StartAsync_Retries()
{
diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs
new file mode 100644
index 000000000..6da53c646
--- /dev/null
+++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBMetricsProviderTests.cs
@@ -0,0 +1,167 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Azure.Cosmos;
+using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger;
+using Microsoft.Azure.WebJobs.Extensions.Tests.Common;
+using Microsoft.Extensions.Logging;
+using Moq;
+using Xunit;
+
+namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests.Trigger
+{
+ public class CosmosDBMetricsProviderTests
+ {
+ private static readonly string DatabaseName = "testDb";
+ private static readonly string ContainerName = "testContainer";
+ private static readonly string ProcessorName = "theProcessor";
+
+ private readonly ILoggerFactory _loggerFactory;
+ private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider();
+ private readonly Mock> _estimatorIterator;
+ private readonly Mock _monitoredContainer;
+ private readonly Mock _leasesContainer;
+ private readonly CosmosDBMetricsProvider _cosmosDbMetricsProvider;
+
+ public CosmosDBMetricsProviderTests()
+ {
+ _loggerFactory = new LoggerFactory();
+ _loggerFactory.AddProvider(_loggerProvider);
+
+ var database = new Mock(MockBehavior.Strict);
+ database.Setup(d => d.Id).Returns(DatabaseName);
+
+ _monitoredContainer = new Mock(MockBehavior.Strict);
+ _monitoredContainer.Setup(m => m.Id).Returns(ContainerName);
+ _monitoredContainer.Setup(m => m.Database).Returns(database.Object);
+
+ _estimatorIterator = new Mock>();
+
+ Mock estimator = new Mock();
+ estimator.Setup(m => m.GetCurrentStateIterator(It.IsAny()))
+ .Returns(_estimatorIterator.Object);
+
+ _leasesContainer = new Mock(MockBehavior.Strict);
+ _leasesContainer.Setup(m => m.Id).Returns(ContainerName);
+ _leasesContainer.Setup(m => m.Database).Returns(database.Object);
+
+ _monitoredContainer
+ .Setup(m => m.GetChangeFeedEstimator(It.Is(s => s == ProcessorName), It.Is(c => c == _leasesContainer.Object)))
+ .Returns(estimator.Object);
+
+ _cosmosDbMetricsProvider = new CosmosDBMetricsProvider(_loggerFactory.CreateLogger(), _monitoredContainer.Object, _leasesContainer.Object, ProcessorName);
+ }
+
+ [Fact]
+ public async Task GetMetrics_ReturnsExpectedResult()
+ {
+ _estimatorIterator
+ .SetupSequence(m => m.HasMoreResults)
+ .Returns(true)
+ .Returns(false);
+
+ Mock> response = new Mock>();
+ response
+ .Setup(m => m.GetEnumerator())
+ .Returns(new List().GetEnumerator());
+
+ _estimatorIterator
+ .Setup(m => m.ReadNextAsync(It.IsAny()))
+ .Returns(Task.FromResult(response.Object));
+
+ var metrics = await _cosmosDbMetricsProvider.GetMetricsAsync();
+
+ Assert.Equal(0, metrics.PartitionCount);
+ Assert.Equal(0, metrics.RemainingWork);
+ Assert.NotEqual(default(DateTime), metrics.Timestamp);
+
+ _estimatorIterator
+ .SetupSequence(m => m.HasMoreResults)
+ .Returns(true)
+ .Returns(false);
+
+ response
+ .Setup(m => m.GetEnumerator())
+ .Returns(new List()
+ {
+ new ChangeFeedProcessorState("a", 5, string.Empty),
+ new ChangeFeedProcessorState("b", 5, string.Empty),
+ new ChangeFeedProcessorState("c", 5, string.Empty),
+ new ChangeFeedProcessorState("d", 5, string.Empty)
+ }.GetEnumerator());
+
+ metrics = await _cosmosDbMetricsProvider.GetMetricsAsync();
+
+ Assert.Equal(4, metrics.PartitionCount);
+ Assert.Equal(20, metrics.RemainingWork);
+ Assert.NotEqual(default(DateTime), metrics.Timestamp);
+
+ _estimatorIterator
+ .SetupSequence(m => m.HasMoreResults)
+ .Returns(true)
+ .Returns(false);
+
+ response
+ .Setup(m => m.GetEnumerator())
+ .Returns(new List()
+ {
+ new ChangeFeedProcessorState("a", 5, string.Empty),
+ new ChangeFeedProcessorState("b", 5, string.Empty),
+ new ChangeFeedProcessorState("c", 5, string.Empty),
+ new ChangeFeedProcessorState("d", 5, string.Empty)
+ }.GetEnumerator());
+
+ // verify non-generic interface works as expected
+ metrics = (CosmosDBTriggerMetrics)await _cosmosDbMetricsProvider.GetMetricsAsync();
+ Assert.Equal(4, metrics.PartitionCount);
+ Assert.Equal(20, metrics.RemainingWork);
+ Assert.NotEqual(default(DateTime), metrics.Timestamp);
+ }
+
+ [Fact]
+ public async Task GetMetrics_HandlesExceptions()
+ {
+ // Can't test DocumentClientExceptions because they can't be constructed.
+ _estimatorIterator
+ .Setup(m => m.HasMoreResults).Returns(true);
+
+ _estimatorIterator
+ .SetupSequence(m => m.ReadNextAsync(It.IsAny()))
+ .ThrowsAsync(new CosmosException("Resource not found", HttpStatusCode.NotFound, 0, string.Empty, 0))
+ .ThrowsAsync(new InvalidOperationException("Unknown"))
+ .ThrowsAsync(new HttpRequestException("Uh oh", new System.Net.WebException("Uh oh again", WebExceptionStatus.NameResolutionFailure)));
+
+ var metrics = (CosmosDBTriggerMetrics)await _cosmosDbMetricsProvider.GetMetricsAsync();
+
+ Assert.Equal(0, metrics.PartitionCount);
+ Assert.Equal(0, metrics.RemainingWork);
+ Assert.NotEqual(default(DateTime), metrics.Timestamp);
+
+ var warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning);
+ Assert.Equal("Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files.", warning.FormattedMessage);
+ _loggerProvider.ClearAllLogMessages();
+
+ await Assert.ThrowsAsync(async () => await _cosmosDbMetricsProvider.GetMetricsAsync());
+
+ warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning);
+ Assert.Equal("Unable to handle System.InvalidOperationException: Unknown", warning.FormattedMessage);
+ _loggerProvider.ClearAllLogMessages();
+
+ metrics = (CosmosDBTriggerMetrics)await _cosmosDbMetricsProvider.GetMetricsAsync();
+
+ Assert.Equal(0, metrics.PartitionCount);
+ Assert.Equal(0, metrics.RemainingWork);
+ Assert.NotEqual(default(DateTime), metrics.Timestamp);
+
+ warning = _loggerProvider.GetAllLogMessages().Single(p => p.Level == Microsoft.Extensions.Logging.LogLevel.Warning);
+ Assert.Equal("CosmosDBTrigger Exception message: Uh oh again.", warning.FormattedMessage);
+ }
+ }
+}
diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs
new file mode 100644
index 000000000..fa436bc8f
--- /dev/null
+++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBScaleMonitorTests.cs
@@ -0,0 +1,241 @@
+// Copyright (c) .NET Foundation. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Azure.Cosmos;
+using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger;
+using Microsoft.Azure.WebJobs.Extensions.Tests.Common;
+using Microsoft.Azure.WebJobs.Host.Executors;
+using Microsoft.Azure.WebJobs.Host.Scale;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Abstractions;
+using Moq;
+using Xunit;
+
+namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests.Trigger
+{
+ public class CosmosDBScaleMonitorTests
+ {
+ private static readonly string DatabaseName = "testDb";
+ private static readonly string ContainerName = "testContainer";
+ private static readonly string ProcessorName = "theProcessor";
+
+ private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider();
+ private readonly ILoggerFactory _loggerFactory;
+ private readonly Mock _mockExecutor;
+ private readonly Mock _monitoredContainer;
+ private readonly Mock _leasesContainer;
+ private readonly Mock> _estimatorIterator;
+ private readonly string _functionId;
+ private readonly string _logDetails;
+ private readonly IScaleMonitor _scaleMonitor;
+
+ public CosmosDBScaleMonitorTests()
+ {
+ _loggerFactory = new LoggerFactory();
+ _loggerFactory.AddProvider(_loggerProvider);
+
+ _mockExecutor = new Mock();
+ _functionId = "testfunctionid";
+
+ var database = new Mock(MockBehavior.Strict);
+ database.Setup(d => d.Id).Returns(DatabaseName);
+
+ _monitoredContainer = new Mock(MockBehavior.Strict);
+ _monitoredContainer.Setup(m => m.Id).Returns(ContainerName);
+ _monitoredContainer.Setup(m => m.Database).Returns(database.Object);
+
+ _estimatorIterator = new Mock>();
+
+ Mock estimator = new Mock();
+ estimator.Setup(m => m.GetCurrentStateIterator(It.IsAny()))
+ .Returns(_estimatorIterator.Object);
+
+ _leasesContainer = new Mock(MockBehavior.Strict);
+ _leasesContainer.Setup(m => m.Id).Returns(ContainerName);
+ _leasesContainer.Setup(m => m.Database).Returns(database.Object);
+
+ _monitoredContainer
+ .Setup(m => m.GetChangeFeedEstimator(It.Is(s => s == ProcessorName), It.Is(c => c == _leasesContainer.Object)))
+ .Returns(estimator.Object);
+
+ var attribute = new CosmosDBTriggerAttribute(DatabaseName, ContainerName);
+
+ _logDetails = $"prefix='{ProcessorName}', monitoredContainer='{ContainerName}', monitoredDatabase='{DatabaseName}', " +
+ $"leaseContainer='{ContainerName}', leaseDatabase='{DatabaseName}', functionId='{this._functionId}'";
+ _scaleMonitor = new CosmosDBScaleMonitor(_functionId, _loggerFactory.CreateLogger(), _monitoredContainer.Object, _leasesContainer.Object, ProcessorName);
+ }
+
+ [Fact]
+ public void ScaleMonitorDescriptor_ReturnsExpectedValue()
+ {
+ Assert.Equal($"{_functionId}-cosmosdbtrigger-{DatabaseName}-{ContainerName}".ToLower(), _scaleMonitor.Descriptor.Id);
+ }
+
+ [Fact]
+ public void GetScaleStatus_NoMetrics_ReturnsVote_None()
+ {
+ var context = new ScaleStatusContext
+ {
+ WorkerCount = 1
+ };
+
+ var status = _scaleMonitor.GetScaleStatus(context);
+ Assert.Equal(ScaleVote.None, status.Vote);
+
+ // verify the non-generic implementation works properly
+ status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context);
+ Assert.Equal(ScaleVote.None, status.Vote);
+ }
+
+ [Fact]
+ public void GetScaleStatus_InstancesPerPartitionThresholdExceeded_ReturnsVote_ScaleIn()
+ {
+ var context = new ScaleStatusContext
+ {
+ WorkerCount = 2
+ };
+ var timestamp = DateTime.UtcNow;
+ context.Metrics = new List
+ {
+ new CosmosDBTriggerMetrics { RemainingWork = 2500, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2505, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2612, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2700, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2810, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 1, Timestamp = timestamp.AddSeconds(15) },
+ };
+
+ var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context);
+ Assert.Equal(ScaleVote.ScaleIn, status.Vote);
+
+ var logs = _loggerProvider.GetAllLogMessages().ToArray();
+ var log = logs[0];
+ Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
+ Assert.Equal("WorkerCount (2) > PartitionCount (1).", log.FormattedMessage);
+ log = logs[1];
+ Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
+ Assert.Equal($"Number of instances (2) is too high relative to number of partitions for container ({ContainerName}, 1).", log.FormattedMessage);
+ }
+
+ [Fact]
+ public void GetScaleStatus_MessagesPerWorkerThresholdExceeded_ReturnsVote_ScaleOut()
+ {
+ var context = new ScaleStatusContext
+ {
+ WorkerCount = 1
+ };
+ var timestamp = DateTime.UtcNow;
+ context.Metrics = new List
+ {
+ new CosmosDBTriggerMetrics { RemainingWork = 2500, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2505, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2612, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2700, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2810, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 2900, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ };
+
+ var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context);
+ Assert.Equal(ScaleVote.ScaleOut, status.Vote);
+
+ var logs = _loggerProvider.GetAllLogMessages().ToArray();
+ var log = logs[0];
+ Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
+ Assert.Equal("RemainingWork (2900) > WorkerCount (1) * 1,000.", log.FormattedMessage);
+ log = logs[1];
+ Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
+ Assert.Equal($"Remaining work for container ({ContainerName}, 2900) is too high relative to the number of instances (1).", log.FormattedMessage);
+ }
+
+ [Fact]
+ public void GetScaleStatus_ConsistentRemainingWork_ReturnsVote_ScaleOut()
+ {
+ var context = new ScaleStatusContext
+ {
+ WorkerCount = 1
+ };
+ var timestamp = DateTime.UtcNow;
+ context.Metrics = new List
+ {
+ new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 2, Timestamp = timestamp.AddSeconds(15) },
+ };
+
+ var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context);
+ Assert.Equal(ScaleVote.ScaleOut, status.Vote);
+
+ var logs = _loggerProvider.GetAllLogMessages().ToArray();
+ var log = logs[0];
+ Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
+ Assert.Equal($"CosmosDB container '{ContainerName}' has documents waiting to be processed.", log.FormattedMessage);
+ log = logs[1];
+ Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
+ Assert.Equal("There are 1 instances relative to 2 partitions.", log.FormattedMessage);
+ }
+
+ [Fact]
+ public void GetScaleStatus_RemainingWorkIncreasing_ReturnsVote_ScaleOut()
+ {
+ var context = new ScaleStatusContext
+ {
+ WorkerCount = 1
+ };
+ var timestamp = DateTime.UtcNow;
+ context.Metrics = new List
+ {
+ new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 20, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 40, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 80, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 100, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ };
+
+ var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context);
+ Assert.Equal(ScaleVote.ScaleOut, status.Vote);
+
+ var logs = _loggerProvider.GetAllLogMessages().ToArray();
+ var log = logs[0];
+ Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
+ Assert.Equal($"Remaining work is increasing for '{ContainerName}'.", log.FormattedMessage);
+ }
+
+ [Fact]
+ public void GetScaleStatus_RemainingWorkDecreasing_ReturnsVote_ScaleIn()
+ {
+ var context = new ScaleStatusContext
+ {
+ WorkerCount = 1
+ };
+ var timestamp = DateTime.UtcNow;
+ context.Metrics = new List
+ {
+ new CosmosDBTriggerMetrics { RemainingWork = 150, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 100, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 80, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 40, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 20, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ new CosmosDBTriggerMetrics { RemainingWork = 10, PartitionCount = 0, Timestamp = timestamp.AddSeconds(15) },
+ };
+
+ var status = ((CosmosDBScaleMonitor)_scaleMonitor).GetScaleStatus(context);
+ Assert.Equal(ScaleVote.ScaleIn, status.Vote);
+
+ var logs = _loggerProvider.GetAllLogMessages().ToArray();
+ var log = logs[0];
+ Assert.Equal(Microsoft.Extensions.Logging.LogLevel.Information, log.Level);
+ Assert.Equal($"Remaining work is decreasing for '{ContainerName}'.", log.FormattedMessage);
+ }
+ }
+}
diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs
new file mode 100644
index 000000000..7f969f595
--- /dev/null
+++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTargetScalerTests.cs
@@ -0,0 +1,114 @@
+using System;
+using System.Collections;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using Microsoft.Azure.Cosmos;
+using Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger;
+using Microsoft.Azure.WebJobs.Extensions.Tests.Common;
+using Microsoft.Azure.WebJobs.Host.Scale;
+using Microsoft.Extensions.Logging;
+using Moq;
+using Xunit;
+
+namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests.Trigger
+{
+ public class CosmosDBTargetScalerTests
+ {
+ private static readonly string DatabaseName = "testDb";
+ private static readonly string ContainerName = "testContainer";
+ private static readonly string ProcessorName = "theProcessor";
+
+ private readonly TestLoggerProvider _loggerProvider = new TestLoggerProvider();
+ private readonly ILoggerFactory _loggerFactory;
+ private readonly Mock _monitoredContainer;
+ private readonly Mock _leasesContainer;
+ private readonly Mock> _estimatorIterator;
+ private readonly CosmosDBTriggerListener _listener;
+ private readonly string _functionId;
+ private readonly string _logDetails;
+ private readonly CosmosDBTargetScaler _targetScaler;
+
+ private CosmosDBTriggerAttribute _attribute;
+
+ public CosmosDBTargetScalerTests()
+ {
+ _loggerFactory = new LoggerFactory();
+ _loggerFactory.AddProvider(_loggerProvider);
+
+ _functionId = "testfunctionid";
+
+ var database = new Mock(MockBehavior.Strict);
+ database.Setup(d => d.Id).Returns(DatabaseName);
+
+ _monitoredContainer = new Mock(MockBehavior.Strict);
+ _monitoredContainer.Setup(m => m.Id).Returns(ContainerName);
+ _monitoredContainer.Setup(m => m.Database).Returns(database.Object);
+
+ _estimatorIterator = new Mock>();
+
+ Mock estimator = new Mock();
+ estimator.Setup(m => m.GetCurrentStateIterator(It.IsAny()))
+ .Returns(_estimatorIterator.Object);
+
+ _leasesContainer = new Mock(MockBehavior.Strict);
+ _leasesContainer.Setup(m => m.Id).Returns(ContainerName);
+ _leasesContainer.Setup(m => m.Database).Returns(database.Object);
+
+ _monitoredContainer
+ .Setup(m => m.GetChangeFeedEstimator(It.Is(s => s == ProcessorName), It.Is(c => c == _leasesContainer.Object)))
+ .Returns(estimator.Object);
+
+ _attribute = new CosmosDBTriggerAttribute(DatabaseName, ContainerName);
+ _targetScaler = new CosmosDBTargetScaler(_functionId, _attribute, _monitoredContainer.Object, _leasesContainer.Object, ProcessorName, _loggerFactory.CreateLogger>());
+ }
+
+ [Theory]
+ [InlineData(null, 200, 2, 2)]
+ [InlineData(null, 300, 2, 2)]
+ [InlineData(null, 300, 0, 3)]
+ [InlineData(null, 0, 3, 0)]
+ [InlineData(50, 200, 0, 4)]
+ [InlineData(-50, 200, 0, 2)]
+ public void GetScaleResultInternal(int? concurrency, long remainingWork, int partitionCount, int expectedTargetWorkerCount)
+ {
+ TargetScalerContext targetScalerContext = new TargetScalerContext
+ {
+ InstanceConcurrency = concurrency,
+ };
+
+ TargetScalerResult result = _targetScaler.GetScaleResultInternal(targetScalerContext, remainingWork, partitionCount);
+
+ Assert.Equal(expectedTargetWorkerCount, result.TargetWorkerCount);
+ }
+
+ [Fact]
+ public async Task GetScaleResultAsync()
+ {
+ TargetScalerContext targetScalerContext = new TargetScalerContext { };
+
+ _estimatorIterator
+ .SetupSequence(m => m.HasMoreResults)
+ .Returns(true)
+ .Returns(false);
+
+ Mock> response = new Mock>();
+ response
+ .Setup(m => m.GetEnumerator())
+ .Returns(new List()
+ {
+ new ChangeFeedProcessorState("a", 100, string.Empty),
+ new ChangeFeedProcessorState("b", 100, string.Empty),
+ new ChangeFeedProcessorState("c", 50, string.Empty),
+ new ChangeFeedProcessorState("d", 100, string.Empty)
+ }.GetEnumerator());
+
+ _estimatorIterator
+ .Setup(m => m.ReadNextAsync(It.IsAny()))
+ .Returns(Task.FromResult(response.Object));
+
+ TargetScalerResult result = await _targetScaler.GetScaleResultAsync(targetScalerContext);
+ Assert.Equal(4, result.TargetWorkerCount);
+ }
+ }
+}
diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj
index a051f2141..5280ea94e 100644
--- a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj
+++ b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.1
false
Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests
Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests
diff --git a/test/WebJobs.Extensions.Http.Tests/WebJobs.Extensions.Http.Tests.csproj b/test/WebJobs.Extensions.Http.Tests/WebJobs.Extensions.Http.Tests.csproj
index a12e5c1fe..61a1fc883 100644
--- a/test/WebJobs.Extensions.Http.Tests/WebJobs.Extensions.Http.Tests.csproj
+++ b/test/WebJobs.Extensions.Http.Tests/WebJobs.Extensions.Http.Tests.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.1
false
Microsoft.Azure.WebJobs.Extensions.Http.Tests
Microsoft.Azure.WebJobs.Extensions.Http.Tests
diff --git a/test/WebJobs.Extensions.MobileApps.Tests/WebJobs.Extensions.MobileApps.Tests.csproj b/test/WebJobs.Extensions.MobileApps.Tests/WebJobs.Extensions.MobileApps.Tests.csproj
index 5ca5ace66..39a60d1d9 100644
--- a/test/WebJobs.Extensions.MobileApps.Tests/WebJobs.Extensions.MobileApps.Tests.csproj
+++ b/test/WebJobs.Extensions.MobileApps.Tests/WebJobs.Extensions.MobileApps.Tests.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.1
false
Microsoft.Azure.WebJobs.Extensions.MobileApps.Tests
Microsoft.Azure.WebJobs.Extensions.MobileApps.Tests
diff --git a/test/WebJobs.Extensions.SendGrid.Tests/WebJobs.Extensions.SendGrid.Tests.csproj b/test/WebJobs.Extensions.SendGrid.Tests/WebJobs.Extensions.SendGrid.Tests.csproj
index ab11296d4..10f903b22 100644
--- a/test/WebJobs.Extensions.SendGrid.Tests/WebJobs.Extensions.SendGrid.Tests.csproj
+++ b/test/WebJobs.Extensions.SendGrid.Tests/WebJobs.Extensions.SendGrid.Tests.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.1
false
Microsoft.Azure.WebJobs.Extensions.SendGrid.Tests
Microsoft.Azure.WebJobs.Extensions.SendGrid.Tests
diff --git a/test/WebJobs.Extensions.Tests.Common/WebJobs.Extensions.Tests.Common.csproj b/test/WebJobs.Extensions.Tests.Common/WebJobs.Extensions.Tests.Common.csproj
index f9fd45653..41e4f4e1f 100644
--- a/test/WebJobs.Extensions.Tests.Common/WebJobs.Extensions.Tests.Common.csproj
+++ b/test/WebJobs.Extensions.Tests.Common/WebJobs.Extensions.Tests.Common.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.1
false
Microsoft.Azure.WebJobs.Extensions.Tests.Common
Microsoft.Azure.WebJobs.Extensions.Tests.Common
diff --git a/test/WebJobs.Extensions.Tests/Extensions/Timers/Scheduling/FileSystemScheduleMonitorTests.cs b/test/WebJobs.Extensions.Tests/Extensions/Timers/Scheduling/FileSystemScheduleMonitorTests.cs
index 63e68eecd..8300fad9d 100644
--- a/test/WebJobs.Extensions.Tests/Extensions/Timers/Scheduling/FileSystemScheduleMonitorTests.cs
+++ b/test/WebJobs.Extensions.Tests/Extensions/Timers/Scheduling/FileSystemScheduleMonitorTests.cs
@@ -77,7 +77,7 @@ public void StatusFilePath_ThrowsWhenPathDoesNotExist()
ArgumentException expectedException =
Assert.Throws(() => localMonitor.StatusFilePath = invalidPath);
Assert.Equal("value", expectedException.ParamName);
- Assert.Equal("The specified path does not exist.\r\nParameter name: value", expectedException.Message);
+ Assert.Equal("The specified path does not exist. (Parameter 'value')", expectedException.Message);
}
[Fact]
diff --git a/test/WebJobs.Extensions.Tests/WebJobs.Extensions.Tests.csproj b/test/WebJobs.Extensions.Tests/WebJobs.Extensions.Tests.csproj
index 533b800d4..5a7dd414f 100644
--- a/test/WebJobs.Extensions.Tests/WebJobs.Extensions.Tests.csproj
+++ b/test/WebJobs.Extensions.Tests/WebJobs.Extensions.Tests.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.1
latest
false
Microsoft.Azure.WebJobs.Extensions.Tests
diff --git a/test/WebJobs.Extensions.Timers.Storage.Tests/WebJobs.Extensions.Timers.Storage.Tests.csproj b/test/WebJobs.Extensions.Timers.Storage.Tests/WebJobs.Extensions.Timers.Storage.Tests.csproj
index 5afdf2106..6c44f35c4 100644
--- a/test/WebJobs.Extensions.Timers.Storage.Tests/WebJobs.Extensions.Timers.Storage.Tests.csproj
+++ b/test/WebJobs.Extensions.Timers.Storage.Tests/WebJobs.Extensions.Timers.Storage.Tests.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.1
false
Microsoft.Azure.WebJobs.Extensions.Timers.Storage.Tests
Microsoft.Azure.WebJobs.Extensions.Timers.Storage.Tests
diff --git a/test/WebJobs.Extensions.Twilio.Tests/WebJobs.Extensions.Twilio.Tests.csproj b/test/WebJobs.Extensions.Twilio.Tests/WebJobs.Extensions.Twilio.Tests.csproj
index 28ef1e753..af30a81cc 100644
--- a/test/WebJobs.Extensions.Twilio.Tests/WebJobs.Extensions.Twilio.Tests.csproj
+++ b/test/WebJobs.Extensions.Twilio.Tests/WebJobs.Extensions.Twilio.Tests.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.1
false
Microsoft.Azure.WebJobs.Extensions.Twilio.Tests
Microsoft.Azure.WebJobs.Extensions.Twilio.Tests