Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/WebJobs.Extensions.CosmosDB/Constants.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB
{
public static class Constants
{
public const string DefaultConnectionStringName = "CosmosDB";
}

internal static class Events
{
public static readonly EventId OnError = new EventId(1, "OnTriggerError");
public static readonly EventId OnAcquire = new EventId(2, "OnTriggerAcquire");
public static readonly EventId OnRelease = new EventId(3, "OnTriggerRelease");
public static readonly EventId OnDelivery = new EventId(4, "OnTriggerDelivery");
public static readonly EventId OnListenerStopError = new EventId(5, "OnTriggerListenerStopError");
public static readonly EventId OnScaling = new EventId(6, "OnScaling");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB
{
internal class CosmosDBTriggerHealthMonitor
{
private readonly ILogger logger;

public CosmosDBTriggerHealthMonitor(ILogger logger)
{
this.logger = logger;
}

public Task OnErrorAsync(string leaseToken, Exception exception)
{
switch (exception)
{
case ChangeFeedProcessorUserException userException:
this.logger.LogWarning(Events.OnError, userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken);
this.logger.LogDebug(Events.OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ChangeFeedProcessorContext.Diagnostics);
break;
case CosmosException cosmosException when cosmosException.StatusCode == HttpStatusCode.RequestTimeout || cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable:
this.logger.LogWarning(Events.OnError, cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken);
break;
default:
this.logger.LogError(Events.OnError, exception, "Lease {LeaseToken} experienced an error during processing.", leaseToken);
break;
}

if (exception is CosmosException asCosmosException)
{
this.logger.LogDebug(Events.OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, asCosmosException.Diagnostics);
}

return Task.CompletedTask;
}

public Task OnLeaseAcquireAsync(string leaseToken)
{
this.logger.LogDebug(Events.OnAcquire, "Lease {LeaseToken} was acquired to start processing.", leaseToken);
return Task.CompletedTask;
}

public Task OnLeaseReleaseAsync(string leaseToken)
{
this.logger.LogDebug(Events.OnRelease, "Lease {LeaseToken} was released.", leaseToken);
return Task.CompletedTask;
}

public void OnChangesDelivered(ChangeFeedProcessorContext context)
{
this.logger.LogDebug(Events.OnDelivery, "Events delivered to lease {LeaseToken} with diagnostics {Diagnostics}", context.LeaseToken, context.Diagnostics);
}
}
}
42 changes: 27 additions & 15 deletions src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ internal class CosmosDBTriggerListener<T> : IListener, IScaleMonitor<CosmosDBTri
private readonly string _processorName;
private readonly string _functionId;
private readonly ScaleMonitorDescriptor _scaleMonitorDescriptor;
private readonly CosmosDBTriggerHealthMonitor _healthMonitor;
private ChangeFeedProcessor _host;
private ChangeFeedProcessorBuilder _hostBuilder;
private int _listenerStatus;
Expand Down Expand Up @@ -68,6 +69,7 @@ public CosmosDBTriggerListener(
this._leaseContainer = leaseContainer;
this._cosmosDBAttribute = cosmosDBAttribute;
this._scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower());
this._healthMonitor = new CosmosDBTriggerHealthMonitor(logger);
}

public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor;
Expand Down Expand Up @@ -132,7 +134,7 @@ public async Task StopAsync(CancellationToken cancellationToken)
}
catch (Exception ex)
{
this._logger.LogWarning($"Stopping the observer failed, potentially it was never started. Exception: {ex.Message}.");
this._logger.LogWarning(Events.OnListenerStopError, "Stopping the observer failed, potentially it was never started. Exception: {Exception}.", ex);
}
}

Expand All @@ -151,6 +153,9 @@ internal virtual void InitializeBuilder()
if (this._hostBuilder == null)
{
this._hostBuilder = this._monitoredContainer.GetChangeFeedProcessorBuilder<T>(this._processorName, this.ProcessChangesAsync)
.WithErrorNotification(this._healthMonitor.OnErrorAsync)
.WithLeaseAcquireNotification(this._healthMonitor.OnLeaseAcquireAsync)
.WithLeaseReleaseNotification(this._healthMonitor.OnLeaseReleaseAsync)
.WithInstanceName(this._hostName)
.WithLeaseContainer(this._leaseContainer);

Expand Down Expand Up @@ -208,9 +213,16 @@ internal virtual void InitializeBuilder()
}
}

private Task ProcessChangesAsync(IReadOnlyCollection<T> docs, CancellationToken cancellationToken)
private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<T> docs, CancellationToken cancellationToken)
{
return this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken);
this._healthMonitor.OnChangesDelivered(context);
FunctionResult result = await this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken);
if (!result.Succeeded
&& result.Exception != null)
{
ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(result.Exception, context);
await this._healthMonitor.OnErrorAsync(context.LeaseToken, userException);
}
}

public async Task<CosmosDBTriggerMetrics> GetMetricsAsync()
Expand Down Expand Up @@ -238,7 +250,7 @@ public async Task<CosmosDBTriggerMetrics> GetMetricsAsync()
{
if (!TryHandleCosmosException(e))
{
_logger.LogWarning("Unable to handle {0}: {1}", e.GetType().ToString(), e.Message);
_logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message);
if (e is InvalidOperationException)
{
throw;
Expand Down Expand Up @@ -267,7 +279,7 @@ public async Task<CosmosDBTriggerMetrics> GetMetricsAsync()
errormsg = e.ToString();
}

_logger.LogWarning(errormsg);
_logger.LogWarning(Events.OnScaling, errormsg);
}

return new CosmosDBTriggerMetrics
Expand Down Expand Up @@ -314,8 +326,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (partitionCount > 0 && partitionCount < workerCount)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."));
_logger.LogInformation(string.Format($"Number of instances ({workerCount}) is too high relative to number " +
_logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount})."));
_logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " +
$"of partitions for container ({this._monitoredContainer.Id}, {partitionCount})."));
return status;
}
Expand All @@ -331,8 +343,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (latestRemainingWork > workerCount * 1000)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000."));
_logger.LogInformation(string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " +
_logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000."));
_logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " +
$"is too high relative to the number of instances ({workerCount})."));
return status;
}
Expand All @@ -341,8 +353,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation(string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed."));
_logger.LogInformation(string.Format($"There are {workerCount} instances relative to {partitionCount} partitions."));
_logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed."));
_logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions."));
return status;
}

Expand All @@ -351,7 +363,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (isIdle)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation(string.Format($"'{this._monitoredContainer.Id}' is idle."));
_logger.LogInformation(Events.OnScaling, string.Format($"'{this._monitoredContainer.Id}' is idle."));
return status;
}

Expand All @@ -365,7 +377,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (remainingWorkIncreasing)
{
status.Vote = ScaleVote.ScaleOut;
_logger.LogInformation($"Remaining work is increasing for '{this._monitoredContainer.Id}'.");
_logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{this._monitoredContainer.Id}'.");
return status;
}

Expand All @@ -377,11 +389,11 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[]
if (remainingWorkDecreasing)
{
status.Vote = ScaleVote.ScaleIn;
_logger.LogInformation($"Remaining work is decreasing for '{this._monitoredContainer.Id}'.");
_logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{this._monitoredContainer.Id}'.");
return status;
}

_logger.LogInformation($"CosmosDB container '{this._monitoredContainer.Id}' is steady.");
_logger.LogInformation(Events.OnScaling, $"CosmosDB container '{this._monitoredContainer.Id}' is steady.");

return status;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
</PropertyGroup>
<Import Project="..\..\build\common.props" />
<PropertyGroup>
<Version>$(CosmosDBVersion)-preview2</Version>
<Version>$(CosmosDBVersion)-preview3</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<TreatWarningsAsErrors>true</TreatWarningsAsErrors>
Expand All @@ -19,7 +19,7 @@
<WarningsAsErrors />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.20.1" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.23.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.31" />
<PackageReference Include="Microsoft.CSharp" Version="4.5.0" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.1.0" />
Expand Down
Loading