Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
977a4db
separate out scale monitor code from cosmosdbtriggerlistener
Nov 1, 2022
2d786db
create CosmosDBMetricsProvider
Nov 1, 2022
5417279
fixing unit tests
Nov 1, 2022
574e381
fix unit tests
Nov 1, 2022
cdf1b59
add cosmosdbtargetscaler and implement interface methods
Nov 1, 2022
32e3479
remove csproj upgrade -- for local dev only
Nov 2, 2022
7253469
remove 3.1 upgrade in test csproj for local testing only
Nov 2, 2022
e413234
change target framework for cdb proj back to 2.0
Nov 2, 2022
a78f6f5
add unit tests for cosmosdbtargetscaler
Nov 2, 2022
b704b01
add tests for getscaleresultasync in cosmosdbtargetscaler
Nov 2, 2022
eb3d774
upgrade netcoreapp version to 3.1
Nov 2, 2022
ff57642
make cdbtriggerlistener implement itargetscaler
Nov 3, 2022
b6eef5b
make targetscaler, scalemonitor, and scalemetrics back to internal fo…
Nov 7, 2022
cfb1240
make targetscaler internal
Nov 7, 2022
6edfd38
Update src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTargetScaler.cs
chiangvincent Nov 17, 2022
3d30b67
use ternary operator when deciding which concurrency value to use
Nov 17, 2022
fc197fa
remove extra quotation in target
Nov 17, 2022
5d86627
move unit tests from listener class to new metrics provider class
Nov 17, 2022
2e63b30
moved cosmosdb scalemonitor tests to separate test class
Nov 17, 2022
4592976
upgrade webjobs sdk version, use new scale monitor descriptor
Nov 21, 2022
da2da07
make cosmosdbtargetscaler, scalemonitor, and scalemetrics public
Jan 12, 2023
8f0c345
Merge branch 'dev' of https://github.com/Azure/azure-webjobs-sdk-exte…
Jan 12, 2023
e5e980b
Merge branch 'dev' into user/vchiang/cosmosDbTbs
Jan 12, 2023
4050282
use prerelease version for webjobs sdk, add xml docs to public classes
Jan 13, 2023
1f5e7b6
Revert "use prerelease version for webjobs sdk, add xml docs to publi…
Jan 17, 2023
e280e40
Revert "Revert "use prerelease version for webjobs sdk, add xml docs …
Jan 17, 2023
158bd9e
make types internal
Jan 17, 2023
e85ac74
remove partitionkey value error message, no longer supported
Feb 2, 2023
428b0d1
remove prereleae version from webjobs sdk since it has been released
Feb 6, 2023
9b4def8
change netcoreapp back to 2.1 to pass build
Feb 6, 2023
413de81
change netcore back to 2.1 for cdb test to pass build
Feb 6, 2023
6580b4c
upgrade target framework to 3.1 to pass build in test csproj
Feb 7, 2023
dfb3778
upgrade version in appveyor to netcore 3.1 as well
Feb 7, 2023
a18019a
upgrade all test csproj to netcore3.1
Feb 7, 2023
d61416a
run tests in minimal moe
Feb 7, 2023
3308138
change run tests back to q
Feb 7, 2023
add4203
fix timer scheduling broken unit test
Feb 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ install:
- ps: >
$env:CommitHash = "$env:APPVEYOR_REPO_COMMIT"

.\dotnet-install.ps1 -Version 2.1.300 -Architecture x86
.\dotnet-install.ps1 -Version 3.1.300 -Architecture x86
build_script:
- ps: |
$buildNumber = 0
Expand Down
2 changes: 1 addition & 1 deletion src/ExtensionsSample/ExtensionsSample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>netcoreapp3.1</TargetFramework>
<LangVersion>latest</LangVersion>
<NoWarn>$(NoWarn);8002</NoWarn>
<DocumentationFile></DocumentationFile>
Expand Down
134 changes: 134 additions & 0 deletions src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBMetricsProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger
{
internal class CosmosDBMetricsProvider
{
private readonly ILogger _logger;
private readonly Container _monitoredContainer;
private readonly Container _leaseContainer;
private readonly string _processorName;

private static readonly Dictionary<string, string> KnownDocumentClientErrors = new Dictionary<string, string>()
{
{ "Resource Not Found", "Please check that the CosmosDB container and leases container exist and are listed correctly in Functions config files." },
{ "The input authorization token can't serve the request", string.Empty },
{ "The MAC signature found in the HTTP request is not the same", string.Empty },
{ "Service is currently unavailable.", string.Empty },
{ "Entity with the specified id does not exist in the system.", string.Empty },
{ "Subscription owning the database account is disabled.", string.Empty },
{ "Request rate is large", string.Empty },
{ "The remote name could not be resolved:", string.Empty },
{ "Owner resource does not exist", string.Empty },
{ "The specified document collection is invalid", string.Empty }
};

public CosmosDBMetricsProvider(ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName)
{
_logger = logger;
_monitoredContainer = monitoredContainer;
_leaseContainer = leaseContainer;
_processorName = processorName;
}

public async Task<CosmosDBTriggerMetrics> GetMetricsAsync()
{
int partitionCount = 0;
long remainingWork = 0;

try
{
List<ChangeFeedProcessorState> partitionWorkList = new List<ChangeFeedProcessorState>();
ChangeFeedEstimator estimator = _monitoredContainer.GetChangeFeedEstimator(_processorName, _leaseContainer);
using (FeedIterator<ChangeFeedProcessorState> iterator = estimator.GetCurrentStateIterator())
{
while (iterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> response = await iterator.ReadNextAsync();
partitionWorkList.AddRange(response);
}
}

partitionCount = partitionWorkList.Count;
Comment thread
chiangvincent marked this conversation as resolved.
remainingWork = partitionWorkList.Sum(item => item.EstimatedLag);
}
catch (Exception e) when (e is CosmosException || e is InvalidOperationException)
Comment thread
chiangvincent marked this conversation as resolved.
{
if (!TryHandleCosmosException(e))
{
_logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message);
if (e is InvalidOperationException)
{
throw;
}
}
}
catch (System.Net.Http.HttpRequestException e)
{
string errormsg;

var webException = e.InnerException as WebException;
if (webException != null &&
webException.Status == WebExceptionStatus.ProtocolError)
{
string statusCode = ((HttpWebResponse)webException.Response).StatusCode.ToString();
string statusDesc = ((HttpWebResponse)webException.Response).StatusDescription;
errormsg = string.Format("CosmosDBTrigger status {0}: {1}.", statusCode, statusDesc);
}
else if (webException != null &&
webException.Status == WebExceptionStatus.NameResolutionFailure)
{
errormsg = string.Format("CosmosDBTrigger Exception message: {0}.", webException.Message);
}
else
{
errormsg = e.ToString();
}

_logger.LogWarning(Events.OnScaling, errormsg);
}

return new CosmosDBTriggerMetrics
{
Timestamp = DateTime.UtcNow,
PartitionCount = partitionCount,
RemainingWork = remainingWork
};
}

// Since all exceptions in the Cosmos client are thrown as CosmosExceptions, we have to parse their error strings because we dont have access to the internal types
private bool TryHandleCosmosException(Exception exception)
{
string errormsg = null;
string exceptionMessage = exception.Message;

if (!string.IsNullOrEmpty(exceptionMessage))
{
foreach (KeyValuePair<string, string> exceptionString in KnownDocumentClientErrors)
{
if (exceptionMessage.IndexOf(exceptionString.Key, StringComparison.OrdinalIgnoreCase) >= 0)
Comment thread
ealsur marked this conversation as resolved.
{
errormsg = !string.IsNullOrEmpty(exceptionString.Value) ? exceptionString.Value : exceptionMessage;
}
}
}

if (!string.IsNullOrEmpty(errormsg))
{
_logger.LogWarning(errormsg);
return true;
}

return false;
}
}
}
174 changes: 174 additions & 0 deletions src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBScaleMonitor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Trigger
{
internal class CosmosDBScaleMonitor : IScaleMonitor<CosmosDBTriggerMetrics>
{
private readonly ILogger _logger;
private readonly string _functionId;
private readonly Container _monitoredContainer;
private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor;
private readonly CosmosDBMetricsProvider _cosmosDBMetricsProvider;

/// <summary>
/// Instantiates a scale monitor for CosmosDB function.
/// </summary>
/// <param name="functionId">FunctionId of the monitored function.</param>
/// <param name="logger">Used for logging.</param>
/// <param name="monitoredContainer">Monitored container for CosmosDB function.</param>
/// <param name="leaseContainer">Lease container for CosmosDB function.</param>
/// <param name="processorName">Processor name used for function.</param>
public CosmosDBScaleMonitor(string functionId, ILogger logger, Container monitoredContainer, Container leaseContainer, string processorName)
{
_logger = logger;
_functionId = functionId;
_monitoredContainer = monitoredContainer;
_scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower(), _functionId);
_cosmosDBMetricsProvider = new CosmosDBMetricsProvider(logger, monitoredContainer, leaseContainer, processorName);
}

public ScaleMonitorDescriptor Descriptor => _scaleMonitorDescriptor;

public ScaleStatus GetScaleStatus(ScaleStatusContext<CosmosDBTriggerMetrics> context)
{
return GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
}

async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
return await GetMetricsAsync().ConfigureAwait(false);
}

public async Task<CosmosDBTriggerMetrics> GetMetricsAsync()
{
return await _cosmosDBMetricsProvider.GetMetricsAsync().ConfigureAwait(false);
}

public ScaleStatus GetScaleStatus(ScaleStatusContext context)
{
context.Metrics?.Cast<CosmosDBTriggerMetrics>().ToArray();

return GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast<CosmosDBTriggerMetrics>().ToArray());
}

private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] metrics)
{
ScaleStatus status = new ScaleStatus
{
Vote = ScaleVote.None
};

const int NumberOfSamplesToConsider = 5;

// Unable to determine the correct vote with no metrics.
if (metrics == null)
{
return status;
}

// We shouldn't assign more workers than there are partitions (Cosmos DB, Event Hub, Service Bus Queue/Topic)
// This check is first, because it is independent of load or number of samples.
int partitionCount = metrics.Length > 0 ? metrics.Last().PartitionCount : 0;
if (partitionCount > 0 && partitionCount < workerCount)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."));
_logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " +
$"of partitions for container ({_monitoredContainer.Id}, {partitionCount})."));
return status;
}

// At least 5 samples are required to make a scale decision for the rest of the checks.
if (metrics.Length < NumberOfSamplesToConsider)
{
return status;
}

// Maintain a minimum ratio of 1 worker per 1,000 items of remaining work.
long latestRemainingWork = metrics.Last().RemainingWork;
if (latestRemainingWork > workerCount * 1000)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000."));
_logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({_monitoredContainer.Id}, {latestRemainingWork}) " +
$"is too high relative to the number of instances ({workerCount})."));
return status;
}

bool documentsWaiting = metrics.All(m => m.RemainingWork > 0);
if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{_monitoredContainer.Id}' has documents waiting to be processed."));
_logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions."));
return status;
}

// Check to see if the trigger source has been empty for a while. Only if all trigger sources are empty do we scale down.
bool isIdle = metrics.All(m => m.RemainingWork == 0);
if (isIdle)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(Events.OnScaling, string.Format($"'{_monitoredContainer.Id}' is idle."));
return status;
}

// Samples are in chronological order. Check for a continuous increase in work remaining.
// If detected, this results in an automatic scale out for the site container.
bool remainingWorkIncreasing =
IsTrueForLast(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.RemainingWork < next.RemainingWork) && metrics[0].RemainingWork > 0;
if (remainingWorkIncreasing)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{_monitoredContainer.Id}'.");
return status;
}

bool remainingWorkDecreasing =
IsTrueForLast(
metrics,
NumberOfSamplesToConsider,
(prev, next) => prev.RemainingWork > next.RemainingWork);
if (remainingWorkDecreasing)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{_monitoredContainer.Id}'.");
return status;
}

_logger.LogInformation(Events.OnScaling, $"CosmosDB container '{_monitoredContainer.Id}' is steady.");

return status;
}

private static bool IsTrueForLast(IList<CosmosDBTriggerMetrics> metrics, int count, Func<CosmosDBTriggerMetrics, CosmosDBTriggerMetrics, bool> predicate)
{
Debug.Assert(count > 1, "count must be greater than 1.");
Debug.Assert(count <= metrics.Count, "count must be less than or equal to the list size.");

// Walks through the list from left to right starting at len(samples) - count.
for (int i = metrics.Count - count; i < metrics.Count - 1; i++)
{
if (!predicate(metrics[i], metrics[i + 1]))
{
return false;
}
}

return true;
}
}
}
Loading