diff --git a/src/WebJobs.Extensions.CosmosDB/Constants.cs b/src/WebJobs.Extensions.CosmosDB/Constants.cs index a2dbfb826..4e7507be5 100644 --- a/src/WebJobs.Extensions.CosmosDB/Constants.cs +++ b/src/WebJobs.Extensions.CosmosDB/Constants.cs @@ -1,9 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using System; -using System.Collections.Generic; -using System.Text; +using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { @@ -11,4 +9,14 @@ public static class Constants { public const string DefaultConnectionStringName = "CosmosDB"; } + + internal static class Events + { + public static readonly EventId OnError = new EventId(1, "OnTriggerError"); + public static readonly EventId OnAcquire = new EventId(2, "OnTriggerAcquire"); + public static readonly EventId OnRelease = new EventId(3, "OnTriggerRelease"); + public static readonly EventId OnDelivery = new EventId(4, "OnTriggerDelivery"); + public static readonly EventId OnListenerStopError = new EventId(5, "OnTriggerListenerStopError"); + public static readonly EventId OnScaling = new EventId(6, "OnScaling"); + } } diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs new file mode 100644 index 000000000..7aa74e58d --- /dev/null +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs @@ -0,0 +1,62 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB +{ + internal class CosmosDBTriggerHealthMonitor + { + private readonly ILogger logger; + + public CosmosDBTriggerHealthMonitor(ILogger logger) + { + this.logger = logger; + } + + public Task OnErrorAsync(string leaseToken, Exception exception) + { + switch (exception) + { + case ChangeFeedProcessorUserException userException: + this.logger.LogWarning(Events.OnError, userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); + this.logger.LogDebug(Events.OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ChangeFeedProcessorContext.Diagnostics); + break; + case CosmosException cosmosException when cosmosException.StatusCode == HttpStatusCode.RequestTimeout || cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable: + this.logger.LogWarning(Events.OnError, cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken); + break; + default: + this.logger.LogError(Events.OnError, exception, "Lease {LeaseToken} experienced an error during processing.", leaseToken); + break; + } + + if (exception is CosmosException asCosmosException) + { + this.logger.LogDebug(Events.OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, asCosmosException.Diagnostics); + } + + return Task.CompletedTask; + } + + public Task OnLeaseAcquireAsync(string leaseToken) + { + this.logger.LogDebug(Events.OnAcquire, "Lease {LeaseToken} was acquired to start processing.", leaseToken); + return Task.CompletedTask; + } + + public Task OnLeaseReleaseAsync(string leaseToken) + { + this.logger.LogDebug(Events.OnRelease, "Lease {LeaseToken} was released.", leaseToken); + return Task.CompletedTask; + } + + public void OnChangesDelivered(ChangeFeedProcessorContext context) + { + this.logger.LogDebug(Events.OnDelivery, "Events delivered to lease {LeaseToken} with diagnostics {Diagnostics}", context.LeaseToken, context.Diagnostics); + } + } +} diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index dc075dc23..356e2c5af 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -31,6 +31,7 @@ internal class CosmosDBTriggerListener : IListener, IScaleMonitor this._scaleMonitorDescriptor; @@ -132,7 +134,7 @@ public async Task StopAsync(CancellationToken cancellationToken) } catch (Exception ex) { - this._logger.LogWarning($"Stopping the observer failed, potentially it was never started. Exception: {ex.Message}."); + this._logger.LogWarning(Events.OnListenerStopError, "Stopping the observer failed, potentially it was never started. Exception: {Exception}.", ex); } } @@ -151,6 +153,9 @@ internal virtual void InitializeBuilder() if (this._hostBuilder == null) { this._hostBuilder = this._monitoredContainer.GetChangeFeedProcessorBuilder(this._processorName, this.ProcessChangesAsync) + .WithErrorNotification(this._healthMonitor.OnErrorAsync) + .WithLeaseAcquireNotification(this._healthMonitor.OnLeaseAcquireAsync) + .WithLeaseReleaseNotification(this._healthMonitor.OnLeaseReleaseAsync) .WithInstanceName(this._hostName) .WithLeaseContainer(this._leaseContainer); @@ -208,9 +213,16 @@ internal virtual void InitializeBuilder() } } - private Task ProcessChangesAsync(IReadOnlyCollection docs, CancellationToken cancellationToken) + private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection docs, CancellationToken cancellationToken) { - return this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken); + this._healthMonitor.OnChangesDelivered(context); + FunctionResult result = await this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken); + if (!result.Succeeded + && result.Exception != null) + { + ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(result.Exception, context); + await this._healthMonitor.OnErrorAsync(context.LeaseToken, userException); + } } public async Task GetMetricsAsync() @@ -238,7 +250,7 @@ public async Task GetMetricsAsync() { if (!TryHandleCosmosException(e)) { - _logger.LogWarning("Unable to handle {0}: {1}", e.GetType().ToString(), e.Message); + _logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message); if (e is InvalidOperationException) { throw; @@ -267,7 +279,7 @@ public async Task GetMetricsAsync() errormsg = e.ToString(); } - _logger.LogWarning(errormsg); + _logger.LogWarning(Events.OnScaling, errormsg); } return new CosmosDBTriggerMetrics @@ -314,8 +326,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (partitionCount > 0 && partitionCount < workerCount) { status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation(string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount}).")); - _logger.LogInformation(string.Format($"Number of instances ({workerCount}) is too high relative to number " + + _logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount}).")); + _logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " + $"of partitions for container ({this._monitoredContainer.Id}, {partitionCount}).")); return status; } @@ -331,8 +343,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (latestRemainingWork > workerCount * 1000) { status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation(string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000.")); - _logger.LogInformation(string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " + + _logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000.")); + _logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " + $"is too high relative to the number of instances ({workerCount}).")); return status; } @@ -341,8 +353,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount) { status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation(string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed.")); - _logger.LogInformation(string.Format($"There are {workerCount} instances relative to {partitionCount} partitions.")); + _logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed.")); + _logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions.")); return status; } @@ -351,7 +363,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (isIdle) { status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation(string.Format($"'{this._monitoredContainer.Id}' is idle.")); + _logger.LogInformation(Events.OnScaling, string.Format($"'{this._monitoredContainer.Id}' is idle.")); return status; } @@ -365,7 +377,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (remainingWorkIncreasing) { status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation($"Remaining work is increasing for '{this._monitoredContainer.Id}'."); + _logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{this._monitoredContainer.Id}'."); return status; } @@ -377,11 +389,11 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (remainingWorkDecreasing) { status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation($"Remaining work is decreasing for '{this._monitoredContainer.Id}'."); + _logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{this._monitoredContainer.Id}'."); return status; } - _logger.LogInformation($"CosmosDB container '{this._monitoredContainer.Id}' is steady."); + _logger.LogInformation(Events.OnScaling, $"CosmosDB container '{this._monitoredContainer.Id}' is steady."); return status; } diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index bd778799f..295ef0322 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -8,7 +8,7 @@ - $(CosmosDBVersion)-preview2 + $(CosmosDBVersion)-preview3 true @@ -19,7 +19,7 @@ - + diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs new file mode 100644 index 000000000..048d460e9 --- /dev/null +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs @@ -0,0 +1,191 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests +{ + public class CosmosDBTriggerHealthMonitorTests + { + [Fact] + public async Task LogsAcquire() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + + await cosmosDBTriggerHealthMonitor.OnLeaseAcquireAsync(leaseToken); + + Assert.Single(mockedLogger.Events); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + } + + [Fact] + public async Task LogsRelease() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + + await cosmosDBTriggerHealthMonitor.OnLeaseReleaseAsync(leaseToken); + + Assert.Single(mockedLogger.Events); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + } + + [Fact] + public void LogsOnChangesDelivered() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + string diagnosticsString = Guid.NewGuid().ToString(); + Mock diagnostics = new Mock(); + diagnostics.Setup(m => m.ToString()).Returns(diagnosticsString); + Mock context = new Mock(); + context.Setup(m => m.LeaseToken).Returns(leaseToken); + context.Setup(m => m.Diagnostics).Returns(diagnostics.Object); + + cosmosDBTriggerHealthMonitor.OnChangesDelivered(context.Object); + + Assert.Single(mockedLogger.Events); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); + } + + [Theory] + [InlineData(HttpStatusCode.RequestTimeout)] + [InlineData(HttpStatusCode.ServiceUnavailable)] + public async Task LogsTransientConnectivity(HttpStatusCode statusCode) + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + string diagnosticsString = Guid.NewGuid().ToString(); + Mock diagnostics = new Mock(); + diagnostics.Setup(m => m.ToString()).Returns(diagnosticsString); + MockedException cosmosException = new MockedException(statusCode, diagnostics.Object); + + await cosmosDBTriggerHealthMonitor.OnErrorAsync(leaseToken, cosmosException); + + Assert.Equal(2, mockedLogger.Events.Count); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Warning, loggedEvent.LogLevel); + Assert.Equal(cosmosException, loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + + loggedEvent = mockedLogger.Events[1]; + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); + } + + [Fact] + public async Task LogsOnUserException() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + string diagnosticsString = Guid.NewGuid().ToString(); + Mock diagnostics = new Mock(); + diagnostics.Setup(m => m.ToString()).Returns(diagnosticsString); + Exception exception = new Exception(); + Mock context = new Mock(); + context.Setup(m => m.LeaseToken).Returns(leaseToken); + context.Setup(m => m.Diagnostics).Returns(diagnostics.Object); + ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(exception, context.Object); + + await cosmosDBTriggerHealthMonitor.OnErrorAsync(leaseToken, userException); + + Assert.Equal(2, mockedLogger.Events.Count); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Warning, loggedEvent.LogLevel); + Assert.Equal(exception, loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + + loggedEvent = mockedLogger.Events[1]; + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); + } + + [Fact] + public async Task LogsOtherException() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + Exception otherException = new Exception(); + + await cosmosDBTriggerHealthMonitor.OnErrorAsync(leaseToken, otherException); + + Assert.Single(mockedLogger.Events); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Error, loggedEvent.LogLevel); + Assert.Equal(otherException, loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + } + + private class MockedException : CosmosException + { + private readonly CosmosDiagnostics diagnostics; + + public MockedException(HttpStatusCode httpStatusCode, CosmosDiagnostics diagnostics) : base("Exception!", httpStatusCode, 0, string.Empty, 0) + { + this.diagnostics = diagnostics; + } + + public override CosmosDiagnostics Diagnostics => this.diagnostics; + } + + private class MockedLogger : ILogger + { + public List Events { get; private set; } = new List(); + + public IDisposable BeginScope(TState state) + { + throw new NotImplementedException(); + } + + public bool IsEnabled(LogLevel logLevel) + { + throw new NotImplementedException(); + } + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + Events.Add(new LogEvent() { LogLevel = logLevel, Exception = exception, Message = state.ToString() }); + } + } + + private class LogEvent + { + public LogLevel LogLevel { get; set; } + + public Exception Exception { get; set; } + + public string Message { get; set; } + } + } +}