From c24be82fc8185b8ae4d80f8cc506c540f0ac2b0e Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Tue, 28 Oct 2025 12:35:56 -0500 Subject: [PATCH] Changes for CritterWatch Metrics Collection Closes GH-1791 Little bit of spiking Preliminary spiking on the new metrics publishing modes Added the new MetricsTests project Unit tests on the instruments within the new metrics subsystem unit tests on MessageHandlingCounts and PerTenantTracking Added InstrumentPump for later Tests on MessageTypeMetricsAccumulator Spiked in HybridMetricsPublishingMessageTracker Added node number to MessageHandlingMetrics Hung the metrics accumulator off of IWolverineRuntime Put the necessary logic to build different message trackers based on metrics modes Put tests in place for the metrics accumulator --- .../CoreTests/Runtime/MockWolverineRuntime.cs | 4 + .../CoreTests/WolverineOptionsTests.cs | 8 + src/Testing/MetricsTests/InstrumentPump.cs | 109 +++++++++++++ src/Testing/MetricsTests/InstrumentsTests.cs | 72 +++++++++ .../MessageHandlingCountsTests.cs | 51 ++++++ src/Testing/MetricsTests/MessagePump.cs | 146 ++++++++++++++++++ src/Testing/MetricsTests/MetricsTests.csproj | 26 ++++ .../MetricsTests/PerTenantTrackingTests.cs | 28 ++++ ...cs_for_a_single_message_and_destination.cs | 63 ++++++++ .../end_to_end_publishing_to_critter_watch.cs | 65 ++++++++ src/Wolverine/Envelope.Internals.cs | 5 + src/Wolverine/Runtime/Handlers/Executor.cs | 10 +- src/Wolverine/Runtime/IWolverineRuntime.cs | 4 + .../Runtime/MessageSucceededContinuation.cs | 10 +- .../Runtime/Metrics/IHandlerMetricsData.cs | 7 + .../Runtime/Metrics/MessageHandlingCounts.cs | 35 +++++ .../Runtime/Metrics/MessageHandlingMetrics.cs | 18 +++ .../Runtime/Metrics/MessageMetrics.cs | 8 + .../Metrics/MessageTypeMetricsAccumulator.cs | 66 ++++++++ .../Runtime/Metrics/MetricsAccumulator.cs | 104 +++++++++++++ .../Runtime/Metrics/PerTenantTracking.cs | 55 +++++++ .../Runtime/Metrics/RecordDeadLetter.cs | 12 ++ .../Runtime/Metrics/RecordEffectiveTime.cs | 10 ++ .../Runtime/Metrics/RecordExecutionTime.cs | 10 ++ .../Runtime/Metrics/RecordFailure.cs | 12 ++ .../Runtime/Wolverine.ExecutorFactory.cs | 20 ++- .../Runtime/WolverineRuntime.DirectMetrics.cs | 118 ++++++++++++++ .../Runtime/WolverineRuntime.Disposal.cs | 5 + .../Runtime/WolverineRuntime.HostService.cs | 13 +- ...e.HybridMetricsPublishingMessageTracker.cs | 101 ++++++++++++ .../Runtime/WolverineRuntime.Tracking.cs | 9 ++ src/Wolverine/Runtime/WolverineRuntime.cs | 8 +- src/Wolverine/Util/Dataflow/BatchingBlock.cs | 87 ----------- src/Wolverine/WolverineOptions.cs | 39 +++++ wolverine.sln | 15 ++ 35 files changed, 1257 insertions(+), 96 deletions(-) create mode 100644 src/Testing/MetricsTests/InstrumentPump.cs create mode 100644 src/Testing/MetricsTests/InstrumentsTests.cs create mode 100644 src/Testing/MetricsTests/MessageHandlingCountsTests.cs create mode 100644 src/Testing/MetricsTests/MessagePump.cs create mode 100644 src/Testing/MetricsTests/MetricsTests.csproj create mode 100644 src/Testing/MetricsTests/PerTenantTrackingTests.cs create mode 100644 src/Testing/MetricsTests/accumulating_metrics_for_a_single_message_and_destination.cs create mode 100644 src/Testing/MetricsTests/end_to_end_publishing_to_critter_watch.cs create mode 100644 src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs create mode 100644 src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs create mode 100644 src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs create mode 100644 src/Wolverine/Runtime/Metrics/MessageMetrics.cs create mode 100644 src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs create mode 100644 src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs create mode 100644 src/Wolverine/Runtime/Metrics/PerTenantTracking.cs create mode 100644 src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs create mode 100644 src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs create mode 100644 src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs create mode 100644 src/Wolverine/Runtime/Metrics/RecordFailure.cs create mode 100644 src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs create mode 100644 src/Wolverine/Runtime/WolverineRuntime.HybridMetricsPublishingMessageTracker.cs delete mode 100644 src/Wolverine/Util/Dataflow/BatchingBlock.cs diff --git a/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs b/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs index c1457532c..536d8ca28 100644 --- a/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs +++ b/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs @@ -12,6 +12,7 @@ using Wolverine.Runtime; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Metrics; using Wolverine.Runtime.RemoteInvocation; using Wolverine.Runtime.Routing; using Wolverine.Transports; @@ -51,8 +52,11 @@ public class MockWolverineRuntime : IWolverineRuntime, IObserver(); void IObserver.OnCompleted() diff --git a/src/Testing/CoreTests/WolverineOptionsTests.cs b/src/Testing/CoreTests/WolverineOptionsTests.cs index 626b146e7..9f184fd48 100644 --- a/src/Testing/CoreTests/WolverineOptionsTests.cs +++ b/src/Testing/CoreTests/WolverineOptionsTests.cs @@ -251,6 +251,14 @@ public void enable_remote_invocation_is_true_by_default() new WolverineOptions().EnableRemoteInvocation.ShouldBeTrue(); } + [Fact] + public void metrics_defaults() + { + var options = new WolverineOptions(); + options.Metrics.Mode.ShouldBe(WolverineMetricsMode.SystemDiagnosticsMeter); + options.Metrics.SamplingPeriod.ShouldBe(5.Seconds()); + } + public interface IFoo; public class Foo : IFoo; diff --git a/src/Testing/MetricsTests/InstrumentPump.cs b/src/Testing/MetricsTests/InstrumentPump.cs new file mode 100644 index 000000000..67139ac73 --- /dev/null +++ b/src/Testing/MetricsTests/InstrumentPump.cs @@ -0,0 +1,109 @@ +using System.Transactions; +using JasperFx; +using JasperFx.Core.Reflection; +using JasperFx.MultiTenancy; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class InstrumentPump +{ + private readonly string[] _tenants; + + private readonly string[] _exceptions = + [ + typeof(BadImageFormatException).FullNameInCode(), typeof(DivideByZeroException).FullNameInCode(), + typeof(InvalidTimeZoneException).FullNameInCode(), typeof(UnknownTenantIdException).FullNameInCode() + ]; + + public InstrumentPump(string[] tenants) + { + _tenants = tenants; + } + + public List Data { get; } = new(); + + public PerTenantMetrics GetExpectedForTenantId(string tenantId) + { + var tracking = new PerTenantTracking(tenantId); + foreach (var data in Data.Where(x => x.TenantId == tenantId)) + { + data.Apply(tracking); + } + + return tracking.CompileAndReset(); + } + + public async Task Publish(int number, MessageTypeMetricsAccumulator accumulator) + { + Func publish = d => + { + Data.Add(d); + return accumulator.EntryPoint.PostAsync(d); + }; + + for (int i = 0; i < number; i++) + { + var tenantId = determineTenantId(); + + var random = Random.Shared.Next(0, 10); + + if (random < 6) + { + var executionTime = Random.Shared.Next(50, 1000); + var effectiveTime = executionTime + Random.Shared.NextDouble(); + await publish(new RecordExecutionTime(executionTime, tenantId)); + await publish(new RecordEffectiveTime(effectiveTime, tenantId)); + } + else if (random <= 9) + { + var exceptionType = _exceptions[Random.Shared.Next(0, _exceptions.Length - 1)]; + await publish(new RecordFailure(exceptionType, tenantId)); + } + else + { + var exceptionType = _exceptions[Random.Shared.Next(0, _exceptions.Length - 1)]; + await publish(new RecordDeadLetter(exceptionType, tenantId)); + } + } + } + + public async Task Publish(int number, string tenantId, MessageTypeMetricsAccumulator accumulator) + { + Func publish = d => + { + Data.Add(d); + return accumulator.EntryPoint.PostAsync(d); + }; + + for (int i = 0; i < number; i++) + { + var random = Random.Shared.Next(0, 10); + + if (random < 6) + { + var executionTime = Random.Shared.Next(50, 1000); + var effectiveTime = executionTime + Random.Shared.NextDouble(); + await publish(new RecordExecutionTime(executionTime, tenantId)); + await publish(new RecordEffectiveTime(effectiveTime, tenantId)); + } + else if (random <= 9) + { + var exceptionType = _exceptions[Random.Shared.Next(0, _exceptions.Length - 1)]; + await publish(new RecordFailure(exceptionType, tenantId)); + } + else + { + var exceptionType = _exceptions[Random.Shared.Next(0, _exceptions.Length - 1)]; + await publish(new RecordDeadLetter(exceptionType, tenantId)); + } + } + } + + private string determineTenantId() + { + if (_tenants.Length <= 1) return StorageConstants.DefaultTenantId; + + return _tenants[Random.Shared.Next(0, _tenants.Length - 1)]; + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/InstrumentsTests.cs b/src/Testing/MetricsTests/InstrumentsTests.cs new file mode 100644 index 000000000..f1067cede --- /dev/null +++ b/src/Testing/MetricsTests/InstrumentsTests.cs @@ -0,0 +1,72 @@ +using JasperFx.Core.Reflection; +using Shouldly; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class InstrumentsTests +{ + [Fact] + public void record_failure() + { + var exceptionType = typeof(BadImageFormatException).FullNameInCode(); + var failure = new RecordFailure(exceptionType, "t1"); + + var tracking = new PerTenantTracking("t1"); + + failure.Apply(tracking); + tracking.Failures[exceptionType].ShouldBe(1); + tracking.DeadLetterCounts.ContainsKey(exceptionType).ShouldBeFalse(); + } + + [Fact] + public void record_dead_letter() + { + var exceptionType = typeof(BadImageFormatException).FullNameInCode(); + var deadLetter = new RecordDeadLetter(exceptionType, "t1"); + + var tracking = new PerTenantTracking("t1"); + + deadLetter.Apply(tracking); + tracking.DeadLetterCounts[exceptionType].ShouldBe(1); + tracking.Failures.ContainsKey(exceptionType).ShouldBeFalse(); + } + + [Fact] + public void record_effective_time() + { + var effectiveTime1 = new RecordEffectiveTime(11.2, "t1"); + var effectiveTime2 = new RecordEffectiveTime(2.1, "t1"); + var effectiveTime3 = new RecordEffectiveTime(3.5, "t1"); + + var tracking = new PerTenantTracking("t1"); + + effectiveTime1.Apply(tracking); + effectiveTime2.Apply(tracking); + effectiveTime3.Apply(tracking); + + tracking.TotalEffectiveTime.ShouldBe(effectiveTime1.Time + effectiveTime2.Time + effectiveTime3.Time); + tracking.Completions.ShouldBe(3); + } + + [Fact] + public void record_execution_time() + { + var execution1 = new RecordExecutionTime(Random.Shared.Next(100, 1000), "t1"); + var execution2 = new RecordExecutionTime(Random.Shared.Next(100, 1000), "t1"); + var execution3 = new RecordExecutionTime(Random.Shared.Next(100, 1000), "t1"); + var execution4 = new RecordExecutionTime(Random.Shared.Next(100, 1000), "t1"); + + var tracking = new PerTenantTracking("t1"); + + execution1.Apply(tracking); + execution2.Apply(tracking); + execution3.Apply(tracking); + execution4.Apply(tracking); + + tracking.Executions.ShouldBe(4); + tracking.TotalExecutionTime.ShouldBe(execution1.Time + execution2.Time + execution3.Time + execution4.Time); + + } + +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/MessageHandlingCountsTests.cs b/src/Testing/MetricsTests/MessageHandlingCountsTests.cs new file mode 100644 index 000000000..1a2af0686 --- /dev/null +++ b/src/Testing/MetricsTests/MessageHandlingCountsTests.cs @@ -0,0 +1,51 @@ +using Shouldly; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class MessageHandlingCountsTests +{ + [Fact] + public void instrument_across_tenants() + { + var counts = new MessageHandlingCounts("m1", new Uri("stub://one")); + + counts.Increment(new RecordExecutionTime(23, "t1")); + counts.Increment(new RecordExecutionTime(45, "t2")); + counts.Increment(new RecordExecutionTime(55, "t1")); + counts.Increment(new RecordExecutionTime(20, "t3")); + counts.Increment(new RecordExecutionTime(117, "t3")); + counts.Increment(new RecordExecutionTime(10, "t2")); + + counts.PerTenant["t1"].Executions.ShouldBe(2); + counts.PerTenant["t1"].TotalExecutionTime.ShouldBe(23 + 55); + + counts.PerTenant["t2"].Executions.ShouldBe(2); + counts.PerTenant["t2"].TotalExecutionTime.ShouldBe(45 + 10); + + counts.PerTenant["t3"].Executions.ShouldBe(2); + counts.PerTenant["t3"].TotalExecutionTime.ShouldBe(20 + 117); + } + + [Fact] + public void clear() + { + var counts = new MessageHandlingCounts("m1", new Uri("stub://one")); + counts.Increment(new RecordExecutionTime(23, "t1")); + counts.Increment(new RecordExecutionTime(45, "t2")); + counts.Increment(new RecordExecutionTime(55, "t1")); + counts.Increment(new RecordExecutionTime(20, "t3")); + counts.Increment(new RecordExecutionTime(117, "t3")); + counts.Increment(new RecordExecutionTime(10, "t2")); + counts.Clear(); + + counts.PerTenant["t1"].Executions.ShouldBe(0); + counts.PerTenant["t1"].TotalExecutionTime.ShouldBe(0); + + counts.PerTenant["t2"].Executions.ShouldBe(0); + counts.PerTenant["t2"].TotalExecutionTime.ShouldBe(0); + + counts.PerTenant["t3"].Executions.ShouldBe(0); + counts.PerTenant["t3"].TotalExecutionTime.ShouldBe(0); + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/MessagePump.cs b/src/Testing/MetricsTests/MessagePump.cs new file mode 100644 index 000000000..9650abb0f --- /dev/null +++ b/src/Testing/MetricsTests/MessagePump.cs @@ -0,0 +1,146 @@ +using System.Collections.Immutable; +using System.Diagnostics; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.ErrorHandling; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class MessagePump : IAsyncDisposable +{ + private IHost _host; + + public async Task StartHostAsync(WolverineMetricsMode mode) + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Metrics.Mode = mode; + opts.OnAnyException().RetryTimes(3).Then.MoveToErrorQueue(); + + opts.LocalQueueFor().Sequential(); + }).StartAsync(); + } + + public async Task PumpMessagesAsync(WolverineMetricsMode mode, TimeSpan duration) + { + if (_host == null) + { + await StartHostAsync(mode); + } + + MetricsCollectionHandler.Clear(); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + while (stopwatch.ElapsedMilliseconds < duration.TotalMilliseconds) + { + var bus = _host.MessageBus(); + for (int i = 0; i < 100; i++) + { + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M1(Guid.CreateVersion7())); + } + + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M2(Guid.CreateVersion7())); + } + + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M3(Guid.CreateVersion7(), Random.Shared.Next(0, 10))); + } + + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M4(Guid.CreateVersion7(), Random.Shared.Next(0, 10))); + } + + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M5(Guid.CreateVersion7())); + } + + } + } + } + + public async ValueTask DisposeAsync() + { + if (_host is IAsyncDisposable hostAsyncDisposable) + { + await hostAsyncDisposable.DisposeAsync(); + } + else + { + _host.Dispose(); + } + } +} + +public static class MetricsCollectionHandler +{ + public static ImmutableArray Collected { get; private set; } + = ImmutableArray.Empty; + + + public static void Clear() + { + Collected = ImmutableArray.Empty; + } + + public static void Handle(MessageHandlingMetrics metrics) + { + Collected = Collected.Add(metrics); + } +} + +public record M1(Guid Id); +public record M2(Guid Id); + +// Need to retry on errors to see that happen +public record M3(Guid Id, int Fails); +public record M4(Guid Id, int Fails); +public record M5(Guid Id); + +public static class MessagesHandler +{ + public static async Task HandleAsync(M1 m) + { + await Task.Delay(Random.Shared.Next(25, 100)); + } + + public static async Task HandleAsync(M2 m) + { + await Task.Delay(Random.Shared.Next(25, 100)); + } + + public static async Task HandleAsync(M3 m, Envelope envelope) + { + if (m.Fails > envelope.Attempts) + { + throw new BadImageFormatException(); + } + + await Task.Delay(Random.Shared.Next(25, 100)); + } + + public static async Task HandleAsync(M4 m, Envelope envelope) + { + if (m.Fails > envelope.Attempts) + { + throw new DivideByZeroException(); + } + + await Task.Delay(Random.Shared.Next(25, 100)); + } + + public static async Task HandleAsync(M4 m) + { + await Task.Delay(Random.Shared.Next(25, 100)); + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/MetricsTests.csproj b/src/Testing/MetricsTests/MetricsTests.csproj new file mode 100644 index 000000000..f37e99173 --- /dev/null +++ b/src/Testing/MetricsTests/MetricsTests.csproj @@ -0,0 +1,26 @@ + + + + net9.0 + enable + enable + false + + + + + + + + + + + + + + + + + + + diff --git a/src/Testing/MetricsTests/PerTenantTrackingTests.cs b/src/Testing/MetricsTests/PerTenantTrackingTests.cs new file mode 100644 index 000000000..329f83959 --- /dev/null +++ b/src/Testing/MetricsTests/PerTenantTrackingTests.cs @@ -0,0 +1,28 @@ +using Shouldly; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class PerTenantTrackingTests +{ + [Fact] + public void per_tenant_clear() + { + var perTenant = new PerTenantTracking("t1"); + perTenant.Executions++; + perTenant.TotalExecutionTime += 10; + perTenant.DeadLetterCounts["foo"] = 1; + perTenant.Failures["bar"] = 1; + perTenant.Completions++; + perTenant.TotalEffectiveTime = 22.4; + + perTenant.Clear(); + + perTenant.Executions.ShouldBe(0); + perTenant.TotalExecutionTime.ShouldBe(0); + perTenant.DeadLetterCounts.Count.ShouldBe(0); + perTenant.Failures.Count.ShouldBe(0); + perTenant.Completions.ShouldBe(0); + perTenant.TotalEffectiveTime.ShouldBe(0); + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/accumulating_metrics_for_a_single_message_and_destination.cs b/src/Testing/MetricsTests/accumulating_metrics_for_a_single_message_and_destination.cs new file mode 100644 index 000000000..37f2ae0fa --- /dev/null +++ b/src/Testing/MetricsTests/accumulating_metrics_for_a_single_message_and_destination.cs @@ -0,0 +1,63 @@ +using JasperFx; +using Shouldly; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class accumulating_metrics_for_a_single_message_and_destination +{ + [Fact] + public async Task pump_in_data_for_single_tenanted() + { + var accumulator = new MessageTypeMetricsAccumulator("m1", new Uri("stub://one")); + + var pump = new InstrumentPump([]); + await pump.Publish(1000, accumulator); + + var tracking = new PerTenantTracking(StorageConstants.DefaultTenantId); + foreach (var data in pump.Data) + { + data.Apply(tracking); + } + + var expected = tracking.CompileAndReset(); + + await accumulator.EntryPoint.WaitForCompletionAsync(); + + var dump = accumulator.TriggerExport(3); + var actual = dump.PerTenant.Single(); + + actual.ShouldMatch(expected); + } + + [Fact] + public async Task pump_in_data_for_multiple_tenants() + { + var accumulator = new MessageTypeMetricsAccumulator("m1", new Uri("stub://one")); + + var pump = new InstrumentPump(["t1", "t2", "t3", "t4"]); + await pump.Publish(1000, "t1", accumulator); + await pump.Publish(1000, "t2", accumulator); + await pump.Publish(1000, "t3", accumulator); + await pump.Publish(1000, "t4", accumulator); + + await accumulator.EntryPoint.WaitForCompletionAsync(); + + var metrics = accumulator.TriggerExport(3); + metrics.PerTenant[0].ShouldMatch(pump.GetExpectedForTenantId("t1")); + metrics.PerTenant[1].ShouldMatch(pump.GetExpectedForTenantId("t2")); + metrics.PerTenant[2].ShouldMatch(pump.GetExpectedForTenantId("t3")); + metrics.PerTenant[3].ShouldMatch(pump.GetExpectedForTenantId("t4")); + } +} + +public static class TestingExtensions +{ + public static void ShouldMatch(this PerTenantMetrics actual, PerTenantMetrics expected) + { + actual.TenantId.ShouldBe(expected.TenantId); + actual.Executions.ShouldBe(expected.Executions); + actual.EffectiveTime.ShouldBe(expected.EffectiveTime); + actual.Executions.ShouldBe(expected.Executions); + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/end_to_end_publishing_to_critter_watch.cs b/src/Testing/MetricsTests/end_to_end_publishing_to_critter_watch.cs new file mode 100644 index 000000000..dcadaf0a5 --- /dev/null +++ b/src/Testing/MetricsTests/end_to_end_publishing_to_critter_watch.cs @@ -0,0 +1,65 @@ +using JasperFx.Core; +using Newtonsoft.Json; +using Shouldly; +using Wolverine; +using Xunit.Abstractions; + +namespace MetricsTests; + +public class end_to_end_publishing_to_critter_watch +{ + private readonly ITestOutputHelper _output; + + public end_to_end_publishing_to_critter_watch(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task run_for_quite_awhile_in_critter_watch_mode() + { + await using var pump = new MessagePump(); + + await pump.PumpMessagesAsync(WolverineMetricsMode.CritterWatch,30.Seconds()); + + var metrics = MetricsCollectionHandler.Collected; + + metrics.Length.ShouldBeGreaterThan(0); + + foreach (var metric in metrics) + { + _output.WriteLine(JsonConvert.SerializeObject(metric, Formatting.Indented)); + + // metric.PerTenant[0].Executions.Count.ShouldBeGreaterThan(0); + // metric.PerTenant[0].Executions.TotalTime.ShouldBeGreaterThan(0); + // metric.PerTenant[0].EffectiveTime.Count.ShouldBeGreaterThan(0); + // metric.PerTenant[0].EffectiveTime.TotalTime.ShouldBeGreaterThan(0); + + + } + } + + [Fact] + public async Task run_for_quite_awhile_in_hybrid_mode() + { + await using var pump = new MessagePump(); + + await pump.PumpMessagesAsync(WolverineMetricsMode.Hybrid,30.Seconds()); + + var metrics = MetricsCollectionHandler.Collected; + + metrics.Length.ShouldBeGreaterThan(0); + + foreach (var metric in metrics) + { + _output.WriteLine(JsonConvert.SerializeObject(metric, Formatting.Indented)); + + // metric.PerTenant[0].Executions.Count.ShouldBeGreaterThan(0); + // metric.PerTenant[0].Executions.TotalTime.ShouldBeGreaterThan(0); + // metric.PerTenant[0].EffectiveTime.Count.ShouldBeGreaterThan(0); + // metric.PerTenant[0].EffectiveTime.TotalTime.ShouldBeGreaterThan(0); + + + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Envelope.Internals.cs b/src/Wolverine/Envelope.Internals.cs index 5c8734ecc..a035fed8e 100644 --- a/src/Wolverine/Envelope.Internals.cs +++ b/src/Wolverine/Envelope.Internals.cs @@ -102,6 +102,11 @@ internal long StopTiming() return _timer.ElapsedMilliseconds; } + /// + /// How long did the current execution take? + /// + internal long ExecutionTime => _timer.ElapsedMilliseconds; + /// /// /// diff --git a/src/Wolverine/Runtime/Handlers/Executor.cs b/src/Wolverine/Runtime/Handlers/Executor.cs index aa4a68d03..9643b5328 100644 --- a/src/Wolverine/Runtime/Handlers/Executor.cs +++ b/src/Wolverine/Runtime/Handlers/Executor.cs @@ -187,7 +187,7 @@ public async Task ExecuteAsync(MessageContext context, Cancellati _tracker.ExecutionFinished(envelope); - return MessageSucceededContinuation.Instance; + return new MessageSucceededContinuation(_tracker); } catch (Exception e) { @@ -271,14 +271,16 @@ public static IExecutor Build(IWolverineRuntime runtime, ObjectPool contextPool, - HandlerGraph handlerGraph, IMessageHandler handler) + HandlerGraph handlerGraph, IMessageHandler handler, IMessageTracker tracker) { var chain = (handler as MessageHandler)?.Chain; var timeoutSpan = chain?.DetermineMessageTimeout(runtime.Options) ?? 5.Seconds(); var rules = chain?.Failures.CombineRules(handlerGraph.Failures) ?? handlerGraph.Failures; - return new Executor(contextPool, runtime, handler, rules, timeoutSpan); + var logger = runtime.LoggerFactory.CreateLogger(handler.MessageType); + + return new Executor(contextPool, logger, handler, tracker, rules, timeoutSpan); } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/IWolverineRuntime.cs b/src/Wolverine/Runtime/IWolverineRuntime.cs index eff6ff6a2..a3bd20ad8 100644 --- a/src/Wolverine/Runtime/IWolverineRuntime.cs +++ b/src/Wolverine/Runtime/IWolverineRuntime.cs @@ -6,6 +6,7 @@ using Wolverine.Persistence.Durability; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Metrics; using Wolverine.Runtime.RemoteInvocation; using Wolverine.Runtime.Routing; @@ -26,6 +27,9 @@ public interface IWolverineRuntime IReplyTracker Replies { get; } IEndpointCollection Endpoints { get; } Meter Meter { get; } + + MetricsAccumulator MetricsAccumulator { get; } + ILoggerFactory LoggerFactory { get; } IAgentRuntime Agents { get; } diff --git a/src/Wolverine/Runtime/MessageSucceededContinuation.cs b/src/Wolverine/Runtime/MessageSucceededContinuation.cs index ab32331db..fa8195b96 100644 --- a/src/Wolverine/Runtime/MessageSucceededContinuation.cs +++ b/src/Wolverine/Runtime/MessageSucceededContinuation.cs @@ -1,6 +1,7 @@ using System.Diagnostics; using Microsoft.Extensions.Logging; using Wolverine.ErrorHandling; +using Wolverine.Logging; namespace Wolverine.Runtime; @@ -8,10 +9,17 @@ public class MessageSucceededContinuation : IContinuation { public static readonly MessageSucceededContinuation Instance = new(); + private IMessageTracker? _tracker; + private MessageSucceededContinuation() { } + public MessageSucceededContinuation(IMessageTracker tracker) + { + _tracker = tracker; + } + public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime runtime, DateTimeOffset now, Activity? activity) @@ -22,7 +30,7 @@ public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, await lifecycle.CompleteAsync(); - runtime.MessageTracking.MessageSucceeded(lifecycle.Envelope!); + (_tracker ?? runtime.MessageTracking).MessageSucceeded(lifecycle.Envelope!); } catch (Exception ex) { diff --git a/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs b/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs new file mode 100644 index 000000000..f9a37c3bb --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs @@ -0,0 +1,7 @@ +namespace Wolverine.Runtime.Metrics; + +public interface IHandlerMetricsData +{ + string TenantId { get; } + void Apply(PerTenantTracking tracking); +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs b/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs new file mode 100644 index 000000000..55579f23b --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs @@ -0,0 +1,35 @@ +using JasperFx; +using JasperFx.Core; + +namespace Wolverine.Runtime.Metrics; + + +public class MessageHandlingCounts +{ + public string MessageType { get; } + public Uri Destination { get; } + + public MessageHandlingCounts(string messageType, Uri destination) + { + MessageType = messageType; + Destination = destination; + } + + public LightweightCache PerTenant { get; } = + new(tenantId => new PerTenantTracking(tenantId)); + + public void Increment(IHandlerMetricsData metricsData) + { + var perTenant = PerTenant[metricsData.TenantId ?? StorageConstants.DefaultTenantId]; + metricsData.Apply(perTenant); + } + + public void Clear() + { + foreach (var tracking in PerTenant) + { + tracking.Clear(); + } + + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs b/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs new file mode 100644 index 000000000..b266eaefe --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs @@ -0,0 +1,18 @@ +using Wolverine.Persistence.Durability.DeadLetterManagement; + +namespace Wolverine.Runtime.Metrics; + +public record MessageHandlingMetrics( + int NodeNumber, + string MessageType, + Uri Destination, + TimeRange Range, + PerTenantMetrics[] PerTenant); + +public record PerTenantMetrics(string TenantId, Executions Executions, EffectiveTime EffectiveTime, ExceptionCounts[] Exceptions); + +public record Executions(int Count, long TotalTime); + +public record EffectiveTime(int Count, double TotalTime); + +public record ExceptionCounts(string ExceptionType, int Failures, int DeadLetters); diff --git a/src/Wolverine/Runtime/Metrics/MessageMetrics.cs b/src/Wolverine/Runtime/Metrics/MessageMetrics.cs new file mode 100644 index 000000000..236565ab7 --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MessageMetrics.cs @@ -0,0 +1,8 @@ +using Wolverine.Persistence.Durability.DeadLetterManagement; + +namespace Wolverine.Runtime.Metrics; + +/// +/// Message type to send remotely +/// +public record MessageMetrics(MessageHandlingMetrics[] Handled); \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs b/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs new file mode 100644 index 000000000..6f7b698bd --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs @@ -0,0 +1,66 @@ +using JasperFx.Blocks; +using JasperFx.Core; +using Wolverine.Persistence.Durability.DeadLetterManagement; + +namespace Wolverine.Runtime.Metrics; + +public class MessageTypeMetricsAccumulator +{ + private readonly object _syncLock = new(); + + public string MessageType { get; } + public Uri Destination { get; } + + public MessageTypeMetricsAccumulator(string messageType, Uri destination) + { + MessageType = messageType; + Destination = destination; + + Counts = new MessageHandlingCounts(messageType, destination); + var processor = new Block(Process); + EntryPoint = processor.BatchUpstream(250.Milliseconds(), 500); + } + + public DateTimeOffset Starting { get; private set; } = DateTimeOffset.UtcNow; + + public MessageHandlingCounts Counts { get; } + + public IBlock EntryPoint { get; } + + public void Process(IHandlerMetricsData[] instruments) + { + lock (_syncLock) + { + foreach (var instrument in instruments) + { + try + { + Counts.Increment(instrument); + } + catch (Exception ) + { + // for now + } + } + } + } + + public MessageHandlingMetrics TriggerExport(int nodeNumber) + { + lock (_syncLock) + { + var time = DateTimeOffset.UtcNow; + + var metrics = new MessageHandlingMetrics( + nodeNumber, + MessageType, + Destination, + new TimeRange(Starting, time), + Counts.PerTenant.OrderBy(x => x.TenantId).Select(x => x.CompileAndReset()).ToArray()); + + Starting = time; + + return metrics; + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs b/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs new file mode 100644 index 000000000..79cb7cf2a --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs @@ -0,0 +1,104 @@ +using System.Collections.Immutable; +using JasperFx.Core; +using Microsoft.Extensions.Logging; +using Wolverine.Configuration; + +namespace Wolverine.Runtime.Metrics; + +// TODO -- make this lazy on WolverineRuntime +public class MetricsAccumulator : IAsyncDisposable +{ + private readonly IWolverineRuntime _runtime; + private readonly object _syncLock = new(); + + private ImmutableArray _accumulators = ImmutableArray.Empty; + private Task _runner; + + public MetricsAccumulator(IWolverineRuntime runtime) + { + _runtime = runtime; + } + + public MessageTypeMetricsAccumulator FindAccumulator(string messageTypeName, Endpoint endpoint) + { + var endpointUri = endpoint.Uri; + return FindAccumulator(messageTypeName, endpointUri); + } + + public MessageTypeMetricsAccumulator FindAccumulator(string messageTypeName, Uri endpointUri) + { + var accumulator = + _accumulators.FirstOrDefault(x => x.MessageType == messageTypeName && x.Destination == endpointUri); + + if (accumulator != null) + { + return accumulator; + } + + lock (_syncLock) + { + accumulator = + _accumulators.FirstOrDefault(x => x.MessageType == messageTypeName && x.Destination == endpointUri); + + if (accumulator != null) + { + return accumulator; + } + + accumulator = new MessageTypeMetricsAccumulator(messageTypeName, endpointUri); + _accumulators = _accumulators.Add(accumulator); + } + + return accumulator; + } + + public async ValueTask DrainAsync() + { + var tasks = _accumulators.Select(x => x.EntryPoint.WaitForCompletionAsync()); + await Task.WhenAll(tasks); + } + + public DateTimeOffset From { get; private set; } = DateTimeOffset.UtcNow; + + public void Start() + { + _runner = Task.Run(async () => + { + try + { + while (!_runtime.Cancellation.IsCancellationRequested) + { + await Task.Delay(_runtime.Options.Metrics.SamplingPeriod); + try + { + var bus = new MessageBus(_runtime); + + foreach (var accumulator in _accumulators) + { + var metrics = accumulator.TriggerExport(_runtime.DurabilitySettings.AssignedNodeNumber); + + if (metrics.PerTenant.Length > 0) + { + await bus.PublishAsync(metrics); + } + } + } + catch (Exception e) + { + _runtime.Logger.LogError(e, "Error trying to export message handler metrics"); + } + } + } + catch (OperationCanceledException e) + { + // Nothing + } + }, _runtime.Cancellation); + } + + public ValueTask DisposeAsync() + { + _runner.SafeDispose(); + return new ValueTask(); + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs b/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs new file mode 100644 index 000000000..a15edda3d --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs @@ -0,0 +1,55 @@ +namespace Wolverine.Runtime.Metrics; + +public class PerTenantTracking +{ + public string TenantId { get; } + + public PerTenantTracking(string tenantId) + { + TenantId = tenantId; + } + + public int Executions { get; set; } + public long TotalExecutionTime { get; set; } + + public int Completions { get; set; } + public double TotalEffectiveTime { get; set; } + + public Dictionary DeadLetterCounts { get; } = new(); + public Dictionary Failures { get; } = new(); + + public PerTenantMetrics CompileAndReset() + { + var exceptionTypes = DeadLetterCounts.Keys.Union(Failures.Keys).ToArray(); + + var response = new PerTenantMetrics( + TenantId, + new Executions(Executions, TotalExecutionTime), + new EffectiveTime(Completions, TotalEffectiveTime), + exceptionTypes.OrderBy(x => x).Select(exceptionType => + { + int failures = 0; + int deadLetters = 0; + DeadLetterCounts.TryGetValue(exceptionType, out deadLetters); + Failures.TryGetValue(exceptionType, out failures); + + return new ExceptionCounts(exceptionType, failures, deadLetters); + }).ToArray() + ); + + Clear(); + + return response; + } + + public void Clear() + { + Executions = 0; + TotalExecutionTime = 0; + Completions = 0; + TotalEffectiveTime = 0; + DeadLetterCounts.Clear(); + Failures.Clear(); + } + +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs b/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs new file mode 100644 index 000000000..944383fd4 --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs @@ -0,0 +1,12 @@ +namespace Wolverine.Runtime.Metrics; + +public record RecordDeadLetter(string ExceptionType, string TenantId) : IHandlerMetricsData +{ + public void Apply(PerTenantTracking tracking) + { + if (!tracking.DeadLetterCounts.TryAdd(ExceptionType, 1)) + { + tracking.DeadLetterCounts[ExceptionType] += 1; + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs b/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs new file mode 100644 index 000000000..dfda4405d --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs @@ -0,0 +1,10 @@ +namespace Wolverine.Runtime.Metrics; + +public record RecordEffectiveTime(double Time, string TenantId) : IHandlerMetricsData +{ + public void Apply(PerTenantTracking tracking) + { + tracking.Completions++; + tracking.TotalEffectiveTime += Time; + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs b/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs new file mode 100644 index 000000000..6907c9169 --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs @@ -0,0 +1,10 @@ +namespace Wolverine.Runtime.Metrics; + +public record RecordExecutionTime(long Time, string TenantId) : IHandlerMetricsData +{ + public void Apply(PerTenantTracking tracking) + { + tracking.Executions++; + tracking.TotalExecutionTime += Time; + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/RecordFailure.cs b/src/Wolverine/Runtime/Metrics/RecordFailure.cs new file mode 100644 index 000000000..a4804c8c3 --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/RecordFailure.cs @@ -0,0 +1,12 @@ +namespace Wolverine.Runtime.Metrics; + +public record RecordFailure(string ExceptionType, string TenantId) : IHandlerMetricsData +{ + public void Apply(PerTenantTracking tracking) + { + if (!tracking.Failures.TryAdd(ExceptionType, 1)) + { + tracking.Failures[ExceptionType] += 1; + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs index dc94206c6..f73e7dc4b 100644 --- a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs +++ b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs @@ -1,6 +1,10 @@ -using Wolverine.Configuration; +using JasperFx.Core.Reflection; +using Wolverine.Configuration; +using Wolverine.Logging; +using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Partitioning; +using Wolverine.Util; namespace Wolverine.Runtime; @@ -33,10 +37,22 @@ IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) handler = batching.BuildHandler(this); } } + + IMessageTracker tracker = this; + if (!messageType.CanBeCastTo() && Options.Metrics.Mode == WolverineMetricsMode.CritterWatch) + { + var accumulator = MetricsAccumulator.FindAccumulator(messageType.ToMessageTypeName(), endpoint); + tracker = new DirectMetricsPublishingMessageTracker(this, accumulator.EntryPoint); + } + else if (!messageType.CanBeCastTo() && Options.Metrics.Mode == WolverineMetricsMode.Hybrid) + { + var accumulator = MetricsAccumulator.FindAccumulator(messageType.ToMessageTypeName(), endpoint); + tracker = new HybridMetricsPublishingMessageTracker(this, accumulator.EntryPoint); + } var executor = handler == null ? new NoHandlerExecutor(messageType, this) - : Executor.Build(this, ExecutionPool, Handlers, handler); + : Executor.Build(this, ExecutionPool, Handlers, handler, tracker); return executor; } diff --git a/src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs b/src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs new file mode 100644 index 000000000..10f575c09 --- /dev/null +++ b/src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs @@ -0,0 +1,118 @@ +using JasperFx.Blocks; +using JasperFx.Core.Reflection; +using Microsoft.Extensions.Logging; +using Wolverine.Logging; +using Wolverine.Runtime.Metrics; +using Wolverine.Tracking; + +namespace Wolverine.Runtime; + +public partial class WolverineRuntime +{ + internal class DirectMetricsPublishingMessageTracker : IMessageTracker + { + private readonly WolverineRuntime _runtime; + private readonly IBlock _sink; + private readonly string _serviceName; + private readonly Guid _uniqueNodeId; + + public DirectMetricsPublishingMessageTracker(WolverineRuntime runtime, IBlock sink) + { + _runtime = runtime; + _sink = sink; + _serviceName = runtime._serviceName; + _uniqueNodeId = runtime._uniqueNodeId; + Logger = _runtime.Logger; + } + + public void LogException(Exception ex, object? correlationId = null, string message = "Exception detected:") + { + _runtime.LogException(ex, correlationId, message); + } + + public ILogger Logger { get; } + + public void Sent(Envelope envelope) + { + // I think we'll have a different mechanism for this + _runtime.ActiveSession?.MaybeRecord(MessageEventType.Sent, envelope, _serviceName, _uniqueNodeId); + _sent(Logger, envelope.CorrelationId!, envelope.GetMessageTypeName(), envelope.Id, + envelope.Destination?.ToString() ?? string.Empty, + null); + } + + public void Received(Envelope envelope) + { + _runtime.ActiveSession?.Record(MessageEventType.Received, envelope, _serviceName, _uniqueNodeId); + _received(Logger, envelope.CorrelationId!, envelope.GetMessageTypeName(), envelope.Id, + envelope.Destination?.ToString() ?? string.Empty, + envelope.ReplyUri?.ToString() ?? string.Empty, null); + } + + public void ExecutionStarted(Envelope envelope) + { + _runtime.ExecutionStarted(envelope); + } + + public void ExecutionFinished(Envelope envelope) + { + var executionTime = envelope.StopTiming(); + if (executionTime > 0) + { + _sink.Post(new RecordExecutionTime(executionTime, envelope.TenantId)); + } + + _runtime.ActiveSession?.Record(MessageEventType.ExecutionFinished, envelope, _serviceName, _uniqueNodeId); + } + + public void ExecutionFinished(Envelope envelope, Exception exception) + { + ExecutionFinished(envelope); + _sink.Post(new RecordFailure(exception.GetType().FullNameInCode(), envelope.TenantId)); + } + + public void MessageSucceeded(Envelope envelope) + { + var time = DateTimeOffset.UtcNow.Subtract(envelope.SentAt.ToUniversalTime()).TotalMilliseconds; + _sink.Post(new RecordEffectiveTime(time, envelope.TenantId)); + + _runtime.ActiveSession?.Record(MessageEventType.MessageSucceeded, envelope, _serviceName, _uniqueNodeId); + } + + public void MessageFailed(Envelope envelope, Exception ex) + { + var time = DateTimeOffset.UtcNow.Subtract(envelope.SentAt.ToUniversalTime()).TotalMilliseconds; + _sink.Post(new RecordEffectiveTime(time, envelope.TenantId)); + _sink.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + + _runtime.ActiveSession?.Record(MessageEventType.Sent, envelope, _serviceName, _uniqueNodeId, ex); + } + + public void NoHandlerFor(Envelope envelope) + { + _runtime.NoHandlerFor(envelope); + } + + public void NoRoutesFor(Envelope envelope) + { + _runtime.NoRoutesFor(envelope); + } + + public void MovedToErrorQueue(Envelope envelope, Exception ex) + { + _runtime.ActiveSession?.Record(MessageEventType.MovedToErrorQueue, envelope, _serviceName, _uniqueNodeId); + _movedToErrorQueue(Logger, envelope, ex); + _sink.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + } + + public void DiscardedEnvelope(Envelope envelope) + { + _undeliverable(Logger, envelope, null); + } + + public void Requeued(Envelope envelope) + { + _runtime.Requeued(envelope); + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.Disposal.cs b/src/Wolverine/Runtime/WolverineRuntime.Disposal.cs index f70a773bc..85a5c1095 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Disposal.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Disposal.cs @@ -33,5 +33,10 @@ async ValueTask IAsyncDisposable.DisposeAsync() { await definition.As().DisposeAsync(); } + + if (_accumulator.IsValueCreated) + { + await _accumulator.Value.DisposeAsync(); + } } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index 6a1df99ea..bd37c3f3b 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -175,7 +175,7 @@ public async Task StopAsync(CancellationToken cancellationToken) return; } - _agentCancellation.Cancel(); + await _agentCancellation.CancelAsync(); _hasStopped = true; @@ -210,6 +210,11 @@ public async Task StopAsync(CancellationToken cancellationToken) await teardownAgentsAsync(); await _endpoints.DrainAsync(); + + if (_accumulator.IsValueCreated) + { + await _accumulator.Value.DrainAsync(); + } } DurabilitySettings.Cancel(); @@ -235,6 +240,12 @@ private void startInMemoryScheduledJobs() private async Task startMessagingTransportsAsync() { + // Start up metrics collection + if (Options.Metrics.Mode != WolverineMetricsMode.SystemDiagnosticsMeter) + { + _accumulator.Value.Start(); + } + discoverListenersFromConventions(); // No local queues if running in Serverless diff --git a/src/Wolverine/Runtime/WolverineRuntime.HybridMetricsPublishingMessageTracker.cs b/src/Wolverine/Runtime/WolverineRuntime.HybridMetricsPublishingMessageTracker.cs new file mode 100644 index 000000000..d380dbea7 --- /dev/null +++ b/src/Wolverine/Runtime/WolverineRuntime.HybridMetricsPublishingMessageTracker.cs @@ -0,0 +1,101 @@ +using JasperFx.Blocks; +using JasperFx.Core.Reflection; +using Wolverine.Logging; +using Wolverine.Runtime.Metrics; + +namespace Wolverine.Runtime; + +public partial class WolverineRuntime +{ + internal class HybridMetricsPublishingMessageTracker : IMessageTracker + { + private readonly WolverineRuntime _runtime; + private readonly IBlock _sink; + + public HybridMetricsPublishingMessageTracker(WolverineRuntime runtime, IBlock sink) + { + _runtime = runtime; + _sink = sink; + } + + public void LogException(Exception ex, object? correlationId = null, string message = "Exception detected:") + { + _runtime.LogException(ex, correlationId, message); + } + + public void Sent(Envelope envelope) + { + _runtime.Sent(envelope); + } + + public void Received(Envelope envelope) + { + _runtime.Received(envelope); + } + + public void ExecutionStarted(Envelope envelope) + { + _runtime.ExecutionStarted(envelope); + } + + public void ExecutionFinished(Envelope envelope) + { + var executionTime = envelope.StopTiming(); + if (executionTime > 0) + { + _sink.Post(new RecordExecutionTime(executionTime, envelope.TenantId)); + } + + _runtime.ExecutionFinished(envelope); + } + + public void ExecutionFinished(Envelope envelope, Exception exception) + { + _runtime.ExecutionFinished(envelope, exception); + _sink.Post(new RecordFailure(exception.GetType().FullNameInCode(), envelope.TenantId)); + } + + public void MessageSucceeded(Envelope envelope) + { + var time = DateTimeOffset.UtcNow.Subtract(envelope.SentAt.ToUniversalTime()).TotalMilliseconds; + _sink.Post(new RecordEffectiveTime(time, envelope.TenantId)); + + _runtime.MessageSucceeded(envelope); + } + + public void MessageFailed(Envelope envelope, Exception ex) + { + var time = DateTimeOffset.UtcNow.Subtract(envelope.SentAt.ToUniversalTime()).TotalMilliseconds; + _sink.Post(new RecordEffectiveTime(time, envelope.TenantId)); + _sink.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + + _runtime.MessageFailed(envelope, ex); + } + + public void NoHandlerFor(Envelope envelope) + { + _runtime.NoHandlerFor(envelope); + } + + public void NoRoutesFor(Envelope envelope) + { + _runtime.NoRoutesFor(envelope); + } + + public void MovedToErrorQueue(Envelope envelope, Exception ex) + { + _runtime.MovedToErrorQueue(envelope, ex); + _sink.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + } + + public void DiscardedEnvelope(Envelope envelope) + { + _runtime.DiscardedEnvelope(envelope); + } + + public void Requeued(Envelope envelope) + { + _runtime.Requeued(envelope); + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs b/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs index 7fcbfdeb2..34d1fb32e 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs @@ -1,6 +1,9 @@ using System.Diagnostics.Metrics; +using JasperFx.Core; +using JasperFx.Core.Reflection; using Microsoft.Extensions.Logging; using Wolverine.Logging; +using Wolverine.Runtime.Metrics; using Wolverine.Tracking; namespace Wolverine.Runtime; @@ -132,6 +135,12 @@ public void MovedToErrorQueue(Envelope envelope, Exception ex) { ActiveSession?.Record(MessageEventType.MovedToErrorQueue, envelope, _serviceName, _uniqueNodeId); _movedToErrorQueue(Logger, envelope, ex); + + if (Options.Metrics.Mode != WolverineMetricsMode.SystemDiagnosticsMeter && envelope.MessageType.IsNotEmpty()) + { + var accumulator = _accumulator.Value.FindAccumulator(envelope.MessageType, envelope.Destination); + accumulator.EntryPoint.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + } } public void DiscardedEnvelope(Envelope envelope) diff --git a/src/Wolverine/Runtime/WolverineRuntime.cs b/src/Wolverine/Runtime/WolverineRuntime.cs index c25c542c5..83a406517 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.cs @@ -13,6 +13,7 @@ using Wolverine.Persistence.Durability; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Metrics; using Wolverine.Runtime.RemoteInvocation; using Wolverine.Runtime.Routing; using Wolverine.Runtime.Scheduled; @@ -25,13 +26,14 @@ public sealed partial class WolverineRuntime : IWolverineRuntime, IHostedService private readonly EndpointCollection _endpoints; private readonly LightweightCache _invokers; - private readonly string _serviceName; + private string _serviceName; private readonly Guid _uniqueNodeId; private ImHashMap _extensions = ImHashMap.Empty; private bool _hasStopped; private readonly Lazy _stores; + private readonly Lazy _accumulator; public WolverineRuntime(WolverineOptions options, IServiceContainer container, @@ -41,6 +43,8 @@ public WolverineRuntime(WolverineOptions options, Options = options; Handlers = options.HandlerGraph; + _accumulator = new Lazy(() => new MetricsAccumulator(this)); + _stores = new Lazy(() => container.Services.GetRequiredService()); @@ -100,6 +104,8 @@ public WolverineRuntime(WolverineOptions options, } } + public MetricsAccumulator MetricsAccumulator => _accumulator.Value; + public IWolverineObserver Observer { get; set; } public IServiceProvider Services => _container.Services; diff --git a/src/Wolverine/Util/Dataflow/BatchingBlock.cs b/src/Wolverine/Util/Dataflow/BatchingBlock.cs deleted file mode 100644 index b66274acd..000000000 --- a/src/Wolverine/Util/Dataflow/BatchingBlock.cs +++ /dev/null @@ -1,87 +0,0 @@ -using System.Threading.Tasks.Dataflow; -using JasperFx.Core; - -namespace Wolverine.Util.Dataflow; - -public class BatchingBlock : IDisposable -{ - private readonly BatchBlock _batchBlock; - private readonly TimeSpan _timeSpan; - private readonly Timer _trigger; - - public BatchingBlock(int milliseconds, ITargetBlock processor, - CancellationToken cancellation = default) - : this(milliseconds.Milliseconds(), processor, 100, cancellation) - { - } - - public BatchingBlock(TimeSpan timeSpan, ITargetBlock processor, int batchSize = 100, - CancellationToken cancellation = default) - { - _timeSpan = timeSpan; - _batchBlock = new BatchBlock(batchSize, new GroupingDataflowBlockOptions - { - CancellationToken = cancellation, - BoundedCapacity = DataflowBlockOptions.Unbounded - }); - - _trigger = new Timer(_ => - { - try - { - _batchBlock.TriggerBatch(); - } - catch (Exception) - { - // ignored - } - }, null, Timeout.Infinite, Timeout.Infinite); - - - _batchBlock.LinkTo(processor); - } - - public int ItemCount => _batchBlock.OutputCount; - - public Task Completion => _batchBlock.Completion; - - - public void Dispose() - { - _trigger.Dispose(); - _batchBlock.Complete(); - } - - public void Send(T item) - { - try - { - _trigger.Change(_timeSpan, Timeout.InfiniteTimeSpan); - } - catch (Exception) - { - // ignored - } - - _batchBlock.Post(item); - } - - public Task SendAsync(T item) - { - try - { - _trigger.Change(_timeSpan, Timeout.InfiniteTimeSpan); - } - catch (Exception) - { - // ignored - } - - return _batchBlock.SendAsync(item); - } - - public void Complete() - { - _batchBlock.Complete(); - } -} \ No newline at end of file diff --git a/src/Wolverine/WolverineOptions.cs b/src/Wolverine/WolverineOptions.cs index cbba36eda..53cc1d8be 100644 --- a/src/Wolverine/WolverineOptions.cs +++ b/src/Wolverine/WolverineOptions.cs @@ -36,6 +36,42 @@ public enum MultipleHandlerBehavior Separated } +public enum WolverineMetricsMode +{ + /// + /// Wolverine will publish performance metrics via System.Diagnostics.Meter + /// where any Otel tooling can be configured to scrape metrics + /// + SystemDiagnosticsMeter, + + /// + /// Wolverine will accumulate and occasionally publish performance metrics + /// via messaging to subscribers configured to listen for Wolverine performance + /// data + /// + CritterWatch, + + /// + /// Wolverine will both accumulate and publish metrics information to CritterWatch + /// *and* publish metrics via System.Diagnostics.Meter + /// + Hybrid +} + +public class MetricsOptions +{ + /// + /// How should Wolverine collect and publish metrics about message handling and publications? + /// + public WolverineMetricsMode Mode { get; set; } = WolverineMetricsMode.SystemDiagnosticsMeter; + + /// + /// If using either CritterWatch or Hybrid metrics publishing, this is the period in which + /// Wolverine will sample and publish metric data collection. Default is 5 seconds + /// + public TimeSpan SamplingPeriod { get; set; } = 5.Seconds(); +} + /// /// Completely defines and configures a Wolverine application /// @@ -46,6 +82,7 @@ public sealed partial class WolverineOptions public WolverineOptions() : this(null) { + } public WolverineOptions(string? assemblyName) @@ -82,6 +119,8 @@ public WolverineOptions(string? assemblyName) MessagePartitioning = new MessagePartitioningRules(this); } + public MetricsOptions Metrics { get; } = new(); + /// /// What is the policy within this application for whether or not it is valid to allow Service Location within /// the generated code for message handlers or HTTP endpoints. Default is AllowedByWarn. Just keep in mind that diff --git a/wolverine.sln b/wolverine.sln index 980483151..0aa833226 100644 --- a/wolverine.sln +++ b/wolverine.sln @@ -297,6 +297,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.SignalR.Tests", " EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WolverineChat", "src\Samples\WolverineChat\WolverineChat.csproj", "{08C40523-A236-4127-8400-2DBCAB3E98CF}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MetricsTests", "src\Testing\MetricsTests\MetricsTests.csproj", "{501C5BD8-41D5-4032-85C7-33F8815EA60D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1663,6 +1665,18 @@ Global {08C40523-A236-4127-8400-2DBCAB3E98CF}.Release|x64.Build.0 = Release|Any CPU {08C40523-A236-4127-8400-2DBCAB3E98CF}.Release|x86.ActiveCfg = Release|Any CPU {08C40523-A236-4127-8400-2DBCAB3E98CF}.Release|x86.Build.0 = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|x64.ActiveCfg = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|x64.Build.0 = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|x86.ActiveCfg = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|x86.Build.0 = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|Any CPU.Build.0 = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|x64.ActiveCfg = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|x64.Build.0 = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|x86.ActiveCfg = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1800,5 +1814,6 @@ Global {36645C4B-BE1F-4184-A14C-D5BFA3F86A86} = {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} {3F62DB30-9A29-487A-9EE2-22A097E3EE3F} = {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} {08C40523-A236-4127-8400-2DBCAB3E98CF} = {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} + {501C5BD8-41D5-4032-85C7-33F8815EA60D} = {96119B5E-B5F0-400A-9580-B342EBE26212} EndGlobalSection EndGlobal