From 5e7ad1f3201a9306b2f5bd75feac59bcc5eefb26 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Mon, 18 Oct 2021 15:46:58 -0700 Subject: [PATCH 01/10] Bumping version --- .../WebJobs.Extensions.CosmosDB.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index bd778799f..7bfd3ad53 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -8,7 +8,7 @@ - $(CosmosDBVersion)-preview2 + $(CosmosDBVersion)-preview3 true @@ -19,7 +19,7 @@ - + From fe3afa3b957d99bf7a4f8eadb0e80649594ebe5f Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 19 Oct 2021 07:43:57 -0700 Subject: [PATCH 02/10] adding logs --- .../Trigger/CosmosDBTriggerHealthMonitor.cs | 62 +++++++++++++++++++ .../Trigger/CosmosDBTriggerListener.cs | 9 ++- 2 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs new file mode 100644 index 000000000..1ab0c93cd --- /dev/null +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs @@ -0,0 +1,62 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB +{ + internal class CosmosDBTriggerHealthMonitor + { + private readonly ILogger logger; + + public CosmosDBTriggerHealthMonitor(ILogger logger) + { + this.logger = logger; + } + + public Task OnError(string leaseToken, Exception exception) + { + switch (exception) + { + case ChangeFeedProcessorUserException userException: + this.logger.LogWarning(userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); + this.logger.LogDebug("Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ExceptionContext.Diagnostics); + break; + case CosmosException cosmosException when cosmosException.StatusCode == HttpStatusCode.RequestTimeout || cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable: + this.logger.LogWarning(cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken); + break; + default: + this.logger.LogError(exception, "Lease {LeaseToken} experienced an error during processing.", leaseToken); + break; + } + + if (exception is CosmosException asCosmosException) + { + this.logger.LogDebug("Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, asCosmosException.Diagnostics); + } + + return Task.CompletedTask; + } + + public Task OnLeaseAcquire(string leaseToken) + { + this.logger.LogInformation("Lease {LeaseToken} was acquired to start processing.", leaseToken); + return Task.CompletedTask; + } + + public Task OnLeaseRelease(string leaseToken) + { + this.logger.LogInformation("Lease {LeaseToken} was released.", leaseToken); + return Task.CompletedTask; + } + + public void OnChangesDelivered(ChangeFeedProcessorContext context) + { + this.logger.LogDebug("Events delivered to lease {LeaseToken} consuming {RequestCharge} RU.", context.LeaseToken, context.Headers.RequestCharge); + } + } +} diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index dc075dc23..46416c498 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -31,6 +31,7 @@ internal class CosmosDBTriggerListener : IListener, IScaleMonitor this._scaleMonitorDescriptor; @@ -151,6 +154,9 @@ internal virtual void InitializeBuilder() if (this._hostBuilder == null) { this._hostBuilder = this._monitoredContainer.GetChangeFeedProcessorBuilder(this._processorName, this.ProcessChangesAsync) + .WithErrorNotification(this._healthMonitor.OnError) + .WithLeaseAcquireNotification(this._healthMonitor.OnLeaseAcquire) + .WithLeaseReleaseNotification(this._healthMonitor.OnLeaseRelease) .WithInstanceName(this._hostName) .WithLeaseContainer(this._leaseContainer); @@ -208,8 +214,9 @@ internal virtual void InitializeBuilder() } } - private Task ProcessChangesAsync(IReadOnlyCollection docs, CancellationToken cancellationToken) + private Task ProcessChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection docs, CancellationToken cancellationToken) { + this._healthMonitor.OnChangesDelivered(context); return this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken); } From ca3de04d89fa22363e200410a9393e084a66d8a7 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 19 Oct 2021 09:38:28 -0700 Subject: [PATCH 03/10] Adding tests --- .../Trigger/CosmosDBTriggerHealthMonitor.cs | 16 +- .../Trigger/CosmosDBTriggerListener.cs | 7 +- .../CosmosDBTriggerHealthMonitorTests.cs | 191 ++++++++++++++++++ 3 files changed, 202 insertions(+), 12 deletions(-) create mode 100644 test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBTriggerHealthMonitorTests.cs diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs index 1ab0c93cd..0ce6ed1df 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs @@ -18,14 +18,14 @@ public CosmosDBTriggerHealthMonitor(ILogger logger) this.logger = logger; } - public Task OnError(string leaseToken, Exception exception) + public Task OnErrorAsync(string leaseToken, Exception exception) { switch (exception) { - case ChangeFeedProcessorUserException userException: - this.logger.LogWarning(userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); - this.logger.LogDebug("Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ExceptionContext.Diagnostics); - break; + //case ChangeFeedProcessorUserException userException: + // this.logger.LogWarning(userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); + // this.logger.LogDebug("Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ExceptionContext.Diagnostics); + // break; case CosmosException cosmosException when cosmosException.StatusCode == HttpStatusCode.RequestTimeout || cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable: this.logger.LogWarning(cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken); break; @@ -42,13 +42,13 @@ public Task OnError(string leaseToken, Exception exception) return Task.CompletedTask; } - public Task OnLeaseAcquire(string leaseToken) + public Task OnLeaseAcquireAsync(string leaseToken) { this.logger.LogInformation("Lease {LeaseToken} was acquired to start processing.", leaseToken); return Task.CompletedTask; } - public Task OnLeaseRelease(string leaseToken) + public Task OnLeaseReleaseAsync(string leaseToken) { this.logger.LogInformation("Lease {LeaseToken} was released.", leaseToken); return Task.CompletedTask; @@ -56,7 +56,7 @@ public Task OnLeaseRelease(string leaseToken) public void OnChangesDelivered(ChangeFeedProcessorContext context) { - this.logger.LogDebug("Events delivered to lease {LeaseToken} consuming {RequestCharge} RU.", context.LeaseToken, context.Headers.RequestCharge); + this.logger.LogDebug("Events delivered to lease {LeaseToken} with diagnostics {Diagnostics}", context.LeaseToken, context.Diagnostics); } } } diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index 46416c498..19eb0cbaa 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -70,7 +70,6 @@ public CosmosDBTriggerListener( this._cosmosDBAttribute = cosmosDBAttribute; this._scaleMonitorDescriptor = new ScaleMonitorDescriptor($"{_functionId}-CosmosDBTrigger-{_monitoredContainer.Database.Id}-{_monitoredContainer.Id}".ToLower()); this._healthMonitor = new CosmosDBTriggerHealthMonitor(logger); - } public ScaleMonitorDescriptor Descriptor => this._scaleMonitorDescriptor; @@ -154,9 +153,9 @@ internal virtual void InitializeBuilder() if (this._hostBuilder == null) { this._hostBuilder = this._monitoredContainer.GetChangeFeedProcessorBuilder(this._processorName, this.ProcessChangesAsync) - .WithErrorNotification(this._healthMonitor.OnError) - .WithLeaseAcquireNotification(this._healthMonitor.OnLeaseAcquire) - .WithLeaseReleaseNotification(this._healthMonitor.OnLeaseRelease) + .WithErrorNotification(this._healthMonitor.OnErrorAsync) + .WithLeaseAcquireNotification(this._healthMonitor.OnLeaseAcquireAsync) + .WithLeaseReleaseNotification(this._healthMonitor.OnLeaseReleaseAsync) .WithInstanceName(this._hostName) .WithLeaseContainer(this._leaseContainer); diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBTriggerHealthMonitorTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBTriggerHealthMonitorTests.cs new file mode 100644 index 000000000..249df6cb2 --- /dev/null +++ b/test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBTriggerHealthMonitorTests.cs @@ -0,0 +1,191 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Azure.Cosmos; +using Microsoft.Extensions.Logging; +using Moq; +using Xunit; + +namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests +{ + public class CosmosDBTriggerHealthMonitorTests + { + [Fact] + public async Task LogsAcquire() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + + await cosmosDBTriggerHealthMonitor.OnLeaseAcquireAsync(leaseToken); + + Assert.Single(mockedLogger.Events); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Information, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + } + + [Fact] + public async Task LogsRelease() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + + await cosmosDBTriggerHealthMonitor.OnLeaseReleaseAsync(leaseToken); + + Assert.Single(mockedLogger.Events); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Information, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + } + + [Fact] + public void LogsOnChangesDelivered() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + string diagnosticsString = Guid.NewGuid().ToString(); + Mock diagnostics = new Mock(); + diagnostics.Setup(m => m.ToString()).Returns(diagnosticsString); + Mock context = new Mock(); + context.Setup(m => m.LeaseToken).Returns(leaseToken); + context.Setup(m => m.Diagnostics).Returns(diagnostics.Object); + + cosmosDBTriggerHealthMonitor.OnChangesDelivered(context.Object); + + Assert.Single(mockedLogger.Events); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); + } + + [Theory] + [InlineData(HttpStatusCode.RequestTimeout)] + [InlineData(HttpStatusCode.ServiceUnavailable)] + public async Task LogsTransientConnectivity(HttpStatusCode statusCode) + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + string diagnosticsString = Guid.NewGuid().ToString(); + Mock diagnostics = new Mock(); + diagnostics.Setup(m => m.ToString()).Returns(diagnosticsString); + MockedException cosmosException = new MockedException(statusCode, diagnostics.Object); + + await cosmosDBTriggerHealthMonitor.OnErrorAsync(leaseToken, cosmosException); + + Assert.Equal(2, mockedLogger.Events.Count); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Warning, loggedEvent.LogLevel); + Assert.Equal(cosmosException, loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + + loggedEvent = mockedLogger.Events[1]; + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); + } + + //[Theory] + //public async Task LogsOnUserException() + //{ + // MockedLogger mockedLogger = new MockedLogger(); + // CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + // string leaseToken = Guid.NewGuid().ToString(); + // string diagnosticsString = Guid.NewGuid().ToString(); + // Mock diagnostics = new Mock(); + // diagnostics.Setup(m => m.ToString()).Returns(diagnosticsString); + // Exception exception = new Exception(); + // Mock context = new Mock(); + // context.Setup(m => m.LeaseToken).Returns(leaseToken); + // context.Setup(m => m.Diagnostics).Returns(diagnostics.Object); + // ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(exception, context.Object); + + // await cosmosDBTriggerHealthMonitor.OnErrorAsync(leaseToken, userException); + + // Assert.Equal(2, mockedLogger.Events.Count); + + // LogEvent loggedEvent = mockedLogger.Events[0]; + // Assert.Equal(LogLevel.Warning, loggedEvent.LogLevel); + // Assert.Equal(exception, loggedEvent.Exception); + // Assert.True(loggedEvent.Message.Contains(leaseToken)); + + // loggedEvent = mockedLogger.Events[1]; + // Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + // Assert.Null(loggedEvent.Exception); + // Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); + //} + + [Fact] + public async Task LogsOtherException() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + Exception otherException = new Exception(); + + await cosmosDBTriggerHealthMonitor.OnErrorAsync(leaseToken, otherException); + + Assert.Single(mockedLogger.Events); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Error, loggedEvent.LogLevel); + Assert.Equal(otherException, loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + } + + private class MockedException : CosmosException + { + private readonly CosmosDiagnostics diagnostics; + + public MockedException(HttpStatusCode httpStatusCode, CosmosDiagnostics diagnostics) : base("Exception!", httpStatusCode, 0, string.Empty, 0) + { + this.diagnostics = diagnostics; + } + + public override CosmosDiagnostics Diagnostics => this.diagnostics; + } + + private class MockedLogger : ILogger + { + public List Events { get; private set; } = new List(); + + public IDisposable BeginScope(TState state) + { + throw new NotImplementedException(); + } + + public bool IsEnabled(LogLevel logLevel) + { + throw new NotImplementedException(); + } + + public void Log(LogLevel logLevel, EventId eventId, TState state, Exception exception, Func formatter) + { + Events.Add(new LogEvent() { LogLevel = logLevel, Exception = exception, Message = state.ToString() }); + } + } + + private class LogEvent + { + public LogLevel LogLevel { get; set; } + + public Exception Exception { get; set; } + + public string Message { get; set; } + } + } +} From d8fbd294e4c1922f8aece3a8e83882a8af7a0920 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 19 Oct 2021 10:05:05 -0700 Subject: [PATCH 04/10] more file --- .../{ => Trigger}/CosmosDBTriggerHealthMonitorTests.cs | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename test/WebJobs.Extensions.CosmosDB.Tests/{ => Trigger}/CosmosDBTriggerHealthMonitorTests.cs (100%) diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBTriggerHealthMonitorTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs similarity index 100% rename from test/WebJobs.Extensions.CosmosDB.Tests/CosmosDBTriggerHealthMonitorTests.cs rename to test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs From 65b8327709636b046652635d7b1fc2d4c3249bd6 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Mon, 15 Nov 2021 07:56:09 -0800 Subject: [PATCH 05/10] Bumping to 3.23 --- .../Trigger/CosmosDBTriggerHealthMonitor.cs | 8 +-- .../WebJobs.Extensions.CosmosDB.csproj | 2 +- .../CosmosDBTriggerHealthMonitorTests.cs | 58 +++++++++---------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs index 0ce6ed1df..66d6ab635 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs @@ -22,10 +22,10 @@ public Task OnErrorAsync(string leaseToken, Exception exception) { switch (exception) { - //case ChangeFeedProcessorUserException userException: - // this.logger.LogWarning(userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); - // this.logger.LogDebug("Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ExceptionContext.Diagnostics); - // break; + case ChangeFeedProcessorUserException userException: + this.logger.LogWarning(userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); + this.logger.LogDebug("Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ChangeFeedProcessorContext.Diagnostics); + break; case CosmosException cosmosException when cosmosException.StatusCode == HttpStatusCode.RequestTimeout || cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable: this.logger.LogWarning(cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken); break; diff --git a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj index 7bfd3ad53..295ef0322 100644 --- a/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj +++ b/src/WebJobs.Extensions.CosmosDB/WebJobs.Extensions.CosmosDB.csproj @@ -19,7 +19,7 @@ - + diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs index 249df6cb2..08e4ebad2 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs @@ -99,35 +99,35 @@ public async Task LogsTransientConnectivity(HttpStatusCode statusCode) Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); } - //[Theory] - //public async Task LogsOnUserException() - //{ - // MockedLogger mockedLogger = new MockedLogger(); - // CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); - // string leaseToken = Guid.NewGuid().ToString(); - // string diagnosticsString = Guid.NewGuid().ToString(); - // Mock diagnostics = new Mock(); - // diagnostics.Setup(m => m.ToString()).Returns(diagnosticsString); - // Exception exception = new Exception(); - // Mock context = new Mock(); - // context.Setup(m => m.LeaseToken).Returns(leaseToken); - // context.Setup(m => m.Diagnostics).Returns(diagnostics.Object); - // ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(exception, context.Object); - - // await cosmosDBTriggerHealthMonitor.OnErrorAsync(leaseToken, userException); - - // Assert.Equal(2, mockedLogger.Events.Count); - - // LogEvent loggedEvent = mockedLogger.Events[0]; - // Assert.Equal(LogLevel.Warning, loggedEvent.LogLevel); - // Assert.Equal(exception, loggedEvent.Exception); - // Assert.True(loggedEvent.Message.Contains(leaseToken)); - - // loggedEvent = mockedLogger.Events[1]; - // Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); - // Assert.Null(loggedEvent.Exception); - // Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); - //} + [Fact] + public async Task LogsOnUserException() + { + MockedLogger mockedLogger = new MockedLogger(); + CosmosDBTriggerHealthMonitor cosmosDBTriggerHealthMonitor = new CosmosDBTriggerHealthMonitor(mockedLogger); + string leaseToken = Guid.NewGuid().ToString(); + string diagnosticsString = Guid.NewGuid().ToString(); + Mock diagnostics = new Mock(); + diagnostics.Setup(m => m.ToString()).Returns(diagnosticsString); + Exception exception = new Exception(); + Mock context = new Mock(); + context.Setup(m => m.LeaseToken).Returns(leaseToken); + context.Setup(m => m.Diagnostics).Returns(diagnostics.Object); + ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(exception, context.Object); + + await cosmosDBTriggerHealthMonitor.OnErrorAsync(leaseToken, userException); + + Assert.Equal(2, mockedLogger.Events.Count); + + LogEvent loggedEvent = mockedLogger.Events[0]; + Assert.Equal(LogLevel.Warning, loggedEvent.LogLevel); + Assert.Equal(exception, loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken)); + + loggedEvent = mockedLogger.Events[1]; + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); + Assert.Null(loggedEvent.Exception); + Assert.True(loggedEvent.Message.Contains(leaseToken) && loggedEvent.Message.Contains(diagnosticsString)); + } [Fact] public async Task LogsOtherException() From b88d7b4141486388a5d7aab29f014c6eb5348bba Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 16 Nov 2021 07:19:43 -0800 Subject: [PATCH 06/10] Bumping to 3.1 --- .../WebJobs.Extensions.CosmosDB.Tests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj index a051f2141..5280ea94e 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj +++ b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp2.1 + netcoreapp3.1 false Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests From 22da752dfd1b672823781b7f635a6a2b2ff383bb Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 16 Nov 2021 07:19:51 -0800 Subject: [PATCH 07/10] Adding EventId --- .../Trigger/CosmosDBTriggerHealthMonitor.cs | 20 +++++++++++-------- .../CosmosDBTriggerHealthMonitorTests.cs | 4 ++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs index 66d6ab635..b9edfcc71 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs @@ -11,6 +11,10 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { internal class CosmosDBTriggerHealthMonitor { + private static readonly EventId OnError = new EventId(1, "OnTriggerError"); + private static readonly EventId OnAcquire = new EventId(2, "OnTriggerAcquire"); + private static readonly EventId OnRelease = new EventId(3, "OnTriggerRelease"); + private static readonly EventId OnDelivery = new EventId(4, "OnTriggerDelivery"); private readonly ILogger logger; public CosmosDBTriggerHealthMonitor(ILogger logger) @@ -23,20 +27,20 @@ public Task OnErrorAsync(string leaseToken, Exception exception) switch (exception) { case ChangeFeedProcessorUserException userException: - this.logger.LogWarning(userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); - this.logger.LogDebug("Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ChangeFeedProcessorContext.Diagnostics); + this.logger.LogWarning(OnError, userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); + this.logger.LogDebug(OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ChangeFeedProcessorContext.Diagnostics); break; case CosmosException cosmosException when cosmosException.StatusCode == HttpStatusCode.RequestTimeout || cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable: - this.logger.LogWarning(cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken); + this.logger.LogWarning(OnError, cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken); break; default: - this.logger.LogError(exception, "Lease {LeaseToken} experienced an error during processing.", leaseToken); + this.logger.LogError(OnError, exception, "Lease {LeaseToken} experienced an error during processing.", leaseToken); break; } if (exception is CosmosException asCosmosException) { - this.logger.LogDebug("Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, asCosmosException.Diagnostics); + this.logger.LogDebug(OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, asCosmosException.Diagnostics); } return Task.CompletedTask; @@ -44,19 +48,19 @@ public Task OnErrorAsync(string leaseToken, Exception exception) public Task OnLeaseAcquireAsync(string leaseToken) { - this.logger.LogInformation("Lease {LeaseToken} was acquired to start processing.", leaseToken); + this.logger.LogDebug(OnAcquire, "Lease {LeaseToken} was acquired to start processing.", leaseToken); return Task.CompletedTask; } public Task OnLeaseReleaseAsync(string leaseToken) { - this.logger.LogInformation("Lease {LeaseToken} was released.", leaseToken); + this.logger.LogDebug(OnRelease, "Lease {LeaseToken} was released.", leaseToken); return Task.CompletedTask; } public void OnChangesDelivered(ChangeFeedProcessorContext context) { - this.logger.LogDebug("Events delivered to lease {LeaseToken} with diagnostics {Diagnostics}", context.LeaseToken, context.Diagnostics); + this.logger.LogDebug(OnDelivery, "Events delivered to lease {LeaseToken} with diagnostics {Diagnostics}", context.LeaseToken, context.Diagnostics); } } } diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs index 08e4ebad2..048d460e9 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs +++ b/test/WebJobs.Extensions.CosmosDB.Tests/Trigger/CosmosDBTriggerHealthMonitorTests.cs @@ -26,7 +26,7 @@ public async Task LogsAcquire() Assert.Single(mockedLogger.Events); LogEvent loggedEvent = mockedLogger.Events[0]; - Assert.Equal(LogLevel.Information, loggedEvent.LogLevel); + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); Assert.Null(loggedEvent.Exception); Assert.True(loggedEvent.Message.Contains(leaseToken)); } @@ -43,7 +43,7 @@ public async Task LogsRelease() Assert.Single(mockedLogger.Events); LogEvent loggedEvent = mockedLogger.Events[0]; - Assert.Equal(LogLevel.Information, loggedEvent.LogLevel); + Assert.Equal(LogLevel.Debug, loggedEvent.LogLevel); Assert.Null(loggedEvent.Exception); Assert.True(loggedEvent.Message.Contains(leaseToken)); } From fd71243030f6ccd925aedc1385c29826302021a5 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 16 Nov 2021 07:35:14 -0800 Subject: [PATCH 08/10] Reporting user errors --- .../Trigger/CosmosDBTriggerListener.cs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index 19eb0cbaa..08be09b75 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -213,10 +213,16 @@ internal virtual void InitializeBuilder() } } - private Task ProcessChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection docs, CancellationToken cancellationToken) + private async Task ProcessChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection docs, CancellationToken cancellationToken) { this._healthMonitor.OnChangesDelivered(context); - return this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken); + FunctionResult result = await this._executor.TryExecuteAsync(new TriggeredFunctionData() { TriggerValue = docs }, cancellationToken); + if (!result.Succeeded + && result.Exception != null) + { + ChangeFeedProcessorUserException userException = new ChangeFeedProcessorUserException(result.Exception, context); + await this._healthMonitor.OnErrorAsync(context.LeaseToken, userException); + } } public async Task GetMetricsAsync() From 12af3fb5d6e56045b9496d87c7e411e0caca51d4 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 16 Nov 2021 07:37:36 -0800 Subject: [PATCH 09/10] Revert to 2.1 --- .../WebJobs.Extensions.CosmosDB.Tests.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj index 5280ea94e..a051f2141 100644 --- a/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj +++ b/test/WebJobs.Extensions.CosmosDB.Tests/WebJobs.Extensions.CosmosDB.Tests.csproj @@ -1,7 +1,7 @@  - netcoreapp3.1 + netcoreapp2.1 false Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests Microsoft.Azure.WebJobs.Extensions.CosmosDB.Tests From e7564c03d97cd8a0527b105190c97f90e599f950 Mon Sep 17 00:00:00 2001 From: Matias Quaranta Date: Tue, 16 Nov 2021 08:39:03 -0800 Subject: [PATCH 10/10] Adding events to other trigger logs --- src/WebJobs.Extensions.CosmosDB/Constants.cs | 14 +++++++--- .../Trigger/CosmosDBTriggerHealthMonitor.cs | 20 ++++++-------- .../Trigger/CosmosDBTriggerListener.cs | 26 +++++++++---------- 3 files changed, 32 insertions(+), 28 deletions(-) diff --git a/src/WebJobs.Extensions.CosmosDB/Constants.cs b/src/WebJobs.Extensions.CosmosDB/Constants.cs index a2dbfb826..4e7507be5 100644 --- a/src/WebJobs.Extensions.CosmosDB/Constants.cs +++ b/src/WebJobs.Extensions.CosmosDB/Constants.cs @@ -1,9 +1,7 @@ // Copyright (c) .NET Foundation. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using System; -using System.Collections.Generic; -using System.Text; +using Microsoft.Extensions.Logging; namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { @@ -11,4 +9,14 @@ public static class Constants { public const string DefaultConnectionStringName = "CosmosDB"; } + + internal static class Events + { + public static readonly EventId OnError = new EventId(1, "OnTriggerError"); + public static readonly EventId OnAcquire = new EventId(2, "OnTriggerAcquire"); + public static readonly EventId OnRelease = new EventId(3, "OnTriggerRelease"); + public static readonly EventId OnDelivery = new EventId(4, "OnTriggerDelivery"); + public static readonly EventId OnListenerStopError = new EventId(5, "OnTriggerListenerStopError"); + public static readonly EventId OnScaling = new EventId(6, "OnScaling"); + } } diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs index b9edfcc71..7aa74e58d 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerHealthMonitor.cs @@ -11,10 +11,6 @@ namespace Microsoft.Azure.WebJobs.Extensions.CosmosDB { internal class CosmosDBTriggerHealthMonitor { - private static readonly EventId OnError = new EventId(1, "OnTriggerError"); - private static readonly EventId OnAcquire = new EventId(2, "OnTriggerAcquire"); - private static readonly EventId OnRelease = new EventId(3, "OnTriggerRelease"); - private static readonly EventId OnDelivery = new EventId(4, "OnTriggerDelivery"); private readonly ILogger logger; public CosmosDBTriggerHealthMonitor(ILogger logger) @@ -27,20 +23,20 @@ public Task OnErrorAsync(string leaseToken, Exception exception) switch (exception) { case ChangeFeedProcessorUserException userException: - this.logger.LogWarning(OnError, userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); - this.logger.LogDebug(OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ChangeFeedProcessorContext.Diagnostics); + this.logger.LogWarning(Events.OnError, userException.InnerException, "Lease {LeaseToken} encountered an unhandled user exception during processing.", leaseToken); + this.logger.LogDebug(Events.OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, userException.ChangeFeedProcessorContext.Diagnostics); break; case CosmosException cosmosException when cosmosException.StatusCode == HttpStatusCode.RequestTimeout || cosmosException.StatusCode == HttpStatusCode.ServiceUnavailable: - this.logger.LogWarning(OnError, cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken); + this.logger.LogWarning(Events.OnError, cosmosException, "Lease {LeaseToken} experiencing transient connectivity issues.", leaseToken); break; default: - this.logger.LogError(OnError, exception, "Lease {LeaseToken} experienced an error during processing.", leaseToken); + this.logger.LogError(Events.OnError, exception, "Lease {LeaseToken} experienced an error during processing.", leaseToken); break; } if (exception is CosmosException asCosmosException) { - this.logger.LogDebug(OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, asCosmosException.Diagnostics); + this.logger.LogDebug(Events.OnError, "Lease {LeaseToken} has error diagnostics {Diagnostics}", leaseToken, asCosmosException.Diagnostics); } return Task.CompletedTask; @@ -48,19 +44,19 @@ public Task OnErrorAsync(string leaseToken, Exception exception) public Task OnLeaseAcquireAsync(string leaseToken) { - this.logger.LogDebug(OnAcquire, "Lease {LeaseToken} was acquired to start processing.", leaseToken); + this.logger.LogDebug(Events.OnAcquire, "Lease {LeaseToken} was acquired to start processing.", leaseToken); return Task.CompletedTask; } public Task OnLeaseReleaseAsync(string leaseToken) { - this.logger.LogDebug(OnRelease, "Lease {LeaseToken} was released.", leaseToken); + this.logger.LogDebug(Events.OnRelease, "Lease {LeaseToken} was released.", leaseToken); return Task.CompletedTask; } public void OnChangesDelivered(ChangeFeedProcessorContext context) { - this.logger.LogDebug(OnDelivery, "Events delivered to lease {LeaseToken} with diagnostics {Diagnostics}", context.LeaseToken, context.Diagnostics); + this.logger.LogDebug(Events.OnDelivery, "Events delivered to lease {LeaseToken} with diagnostics {Diagnostics}", context.LeaseToken, context.Diagnostics); } } } diff --git a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs index 08be09b75..356e2c5af 100644 --- a/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs +++ b/src/WebJobs.Extensions.CosmosDB/Trigger/CosmosDBTriggerListener.cs @@ -134,7 +134,7 @@ public async Task StopAsync(CancellationToken cancellationToken) } catch (Exception ex) { - this._logger.LogWarning($"Stopping the observer failed, potentially it was never started. Exception: {ex.Message}."); + this._logger.LogWarning(Events.OnListenerStopError, "Stopping the observer failed, potentially it was never started. Exception: {Exception}.", ex); } } @@ -250,7 +250,7 @@ public async Task GetMetricsAsync() { if (!TryHandleCosmosException(e)) { - _logger.LogWarning("Unable to handle {0}: {1}", e.GetType().ToString(), e.Message); + _logger.LogWarning(Events.OnScaling, "Unable to handle {0}: {1}", e.GetType().ToString(), e.Message); if (e is InvalidOperationException) { throw; @@ -279,7 +279,7 @@ public async Task GetMetricsAsync() errormsg = e.ToString(); } - _logger.LogWarning(errormsg); + _logger.LogWarning(Events.OnScaling, errormsg); } return new CosmosDBTriggerMetrics @@ -326,8 +326,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (partitionCount > 0 && partitionCount < workerCount) { status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation(string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount}).")); - _logger.LogInformation(string.Format($"Number of instances ({workerCount}) is too high relative to number " + + _logger.LogInformation(Events.OnScaling, string.Format($"WorkerCount ({workerCount}) > PartitionCount ({partitionCount}).")); + _logger.LogInformation(Events.OnScaling, string.Format($"Number of instances ({workerCount}) is too high relative to number " + $"of partitions for container ({this._monitoredContainer.Id}, {partitionCount}).")); return status; } @@ -343,8 +343,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (latestRemainingWork > workerCount * 1000) { status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation(string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000.")); - _logger.LogInformation(string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " + + _logger.LogInformation(Events.OnScaling, string.Format($"RemainingWork ({latestRemainingWork}) > WorkerCount ({workerCount}) * 1,000.")); + _logger.LogInformation(Events.OnScaling, string.Format($"Remaining work for container ({this._monitoredContainer.Id}, {latestRemainingWork}) " + $"is too high relative to the number of instances ({workerCount}).")); return status; } @@ -353,8 +353,8 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (documentsWaiting && partitionCount > 0 && partitionCount > workerCount) { status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation(string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed.")); - _logger.LogInformation(string.Format($"There are {workerCount} instances relative to {partitionCount} partitions.")); + _logger.LogInformation(Events.OnScaling, string.Format($"CosmosDB container '{this._monitoredContainer.Id}' has documents waiting to be processed.")); + _logger.LogInformation(Events.OnScaling, string.Format($"There are {workerCount} instances relative to {partitionCount} partitions.")); return status; } @@ -363,7 +363,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (isIdle) { status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation(string.Format($"'{this._monitoredContainer.Id}' is idle.")); + _logger.LogInformation(Events.OnScaling, string.Format($"'{this._monitoredContainer.Id}' is idle.")); return status; } @@ -377,7 +377,7 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (remainingWorkIncreasing) { status.Vote = ScaleVote.ScaleOut; - _logger.LogInformation($"Remaining work is increasing for '{this._monitoredContainer.Id}'."); + _logger.LogInformation(Events.OnScaling, $"Remaining work is increasing for '{this._monitoredContainer.Id}'."); return status; } @@ -389,11 +389,11 @@ private ScaleStatus GetScaleStatusCore(int workerCount, CosmosDBTriggerMetrics[] if (remainingWorkDecreasing) { status.Vote = ScaleVote.ScaleIn; - _logger.LogInformation($"Remaining work is decreasing for '{this._monitoredContainer.Id}'."); + _logger.LogInformation(Events.OnScaling, $"Remaining work is decreasing for '{this._monitoredContainer.Id}'."); return status; } - _logger.LogInformation($"CosmosDB container '{this._monitoredContainer.Id}' is steady."); + _logger.LogInformation(Events.OnScaling, $"CosmosDB container '{this._monitoredContainer.Id}' is steady."); return status; }