diff --git a/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs b/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs index c1457532c..536d8ca28 100644 --- a/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs +++ b/src/Testing/CoreTests/Runtime/MockWolverineRuntime.cs @@ -12,6 +12,7 @@ using Wolverine.Runtime; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Metrics; using Wolverine.Runtime.RemoteInvocation; using Wolverine.Runtime.Routing; using Wolverine.Transports; @@ -51,8 +52,11 @@ public class MockWolverineRuntime : IWolverineRuntime, IObserver(); void IObserver.OnCompleted() diff --git a/src/Testing/CoreTests/WolverineOptionsTests.cs b/src/Testing/CoreTests/WolverineOptionsTests.cs index 626b146e7..9f184fd48 100644 --- a/src/Testing/CoreTests/WolverineOptionsTests.cs +++ b/src/Testing/CoreTests/WolverineOptionsTests.cs @@ -251,6 +251,14 @@ public void enable_remote_invocation_is_true_by_default() new WolverineOptions().EnableRemoteInvocation.ShouldBeTrue(); } + [Fact] + public void metrics_defaults() + { + var options = new WolverineOptions(); + options.Metrics.Mode.ShouldBe(WolverineMetricsMode.SystemDiagnosticsMeter); + options.Metrics.SamplingPeriod.ShouldBe(5.Seconds()); + } + public interface IFoo; public class Foo : IFoo; diff --git a/src/Testing/MetricsTests/InstrumentPump.cs b/src/Testing/MetricsTests/InstrumentPump.cs new file mode 100644 index 000000000..67139ac73 --- /dev/null +++ b/src/Testing/MetricsTests/InstrumentPump.cs @@ -0,0 +1,109 @@ +using System.Transactions; +using JasperFx; +using JasperFx.Core.Reflection; +using JasperFx.MultiTenancy; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class InstrumentPump +{ + private readonly string[] _tenants; + + private readonly string[] _exceptions = + [ + typeof(BadImageFormatException).FullNameInCode(), typeof(DivideByZeroException).FullNameInCode(), + typeof(InvalidTimeZoneException).FullNameInCode(), typeof(UnknownTenantIdException).FullNameInCode() + ]; + + public InstrumentPump(string[] tenants) + { + _tenants = tenants; + } + + public List Data { get; } = new(); + + public PerTenantMetrics GetExpectedForTenantId(string tenantId) + { + var tracking = new PerTenantTracking(tenantId); + foreach (var data in Data.Where(x => x.TenantId == tenantId)) + { + data.Apply(tracking); + } + + return tracking.CompileAndReset(); + } + + public async Task Publish(int number, MessageTypeMetricsAccumulator accumulator) + { + Func publish = d => + { + Data.Add(d); + return accumulator.EntryPoint.PostAsync(d); + }; + + for (int i = 0; i < number; i++) + { + var tenantId = determineTenantId(); + + var random = Random.Shared.Next(0, 10); + + if (random < 6) + { + var executionTime = Random.Shared.Next(50, 1000); + var effectiveTime = executionTime + Random.Shared.NextDouble(); + await publish(new RecordExecutionTime(executionTime, tenantId)); + await publish(new RecordEffectiveTime(effectiveTime, tenantId)); + } + else if (random <= 9) + { + var exceptionType = _exceptions[Random.Shared.Next(0, _exceptions.Length - 1)]; + await publish(new RecordFailure(exceptionType, tenantId)); + } + else + { + var exceptionType = _exceptions[Random.Shared.Next(0, _exceptions.Length - 1)]; + await publish(new RecordDeadLetter(exceptionType, tenantId)); + } + } + } + + public async Task Publish(int number, string tenantId, MessageTypeMetricsAccumulator accumulator) + { + Func publish = d => + { + Data.Add(d); + return accumulator.EntryPoint.PostAsync(d); + }; + + for (int i = 0; i < number; i++) + { + var random = Random.Shared.Next(0, 10); + + if (random < 6) + { + var executionTime = Random.Shared.Next(50, 1000); + var effectiveTime = executionTime + Random.Shared.NextDouble(); + await publish(new RecordExecutionTime(executionTime, tenantId)); + await publish(new RecordEffectiveTime(effectiveTime, tenantId)); + } + else if (random <= 9) + { + var exceptionType = _exceptions[Random.Shared.Next(0, _exceptions.Length - 1)]; + await publish(new RecordFailure(exceptionType, tenantId)); + } + else + { + var exceptionType = _exceptions[Random.Shared.Next(0, _exceptions.Length - 1)]; + await publish(new RecordDeadLetter(exceptionType, tenantId)); + } + } + } + + private string determineTenantId() + { + if (_tenants.Length <= 1) return StorageConstants.DefaultTenantId; + + return _tenants[Random.Shared.Next(0, _tenants.Length - 1)]; + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/InstrumentsTests.cs b/src/Testing/MetricsTests/InstrumentsTests.cs new file mode 100644 index 000000000..f1067cede --- /dev/null +++ b/src/Testing/MetricsTests/InstrumentsTests.cs @@ -0,0 +1,72 @@ +using JasperFx.Core.Reflection; +using Shouldly; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class InstrumentsTests +{ + [Fact] + public void record_failure() + { + var exceptionType = typeof(BadImageFormatException).FullNameInCode(); + var failure = new RecordFailure(exceptionType, "t1"); + + var tracking = new PerTenantTracking("t1"); + + failure.Apply(tracking); + tracking.Failures[exceptionType].ShouldBe(1); + tracking.DeadLetterCounts.ContainsKey(exceptionType).ShouldBeFalse(); + } + + [Fact] + public void record_dead_letter() + { + var exceptionType = typeof(BadImageFormatException).FullNameInCode(); + var deadLetter = new RecordDeadLetter(exceptionType, "t1"); + + var tracking = new PerTenantTracking("t1"); + + deadLetter.Apply(tracking); + tracking.DeadLetterCounts[exceptionType].ShouldBe(1); + tracking.Failures.ContainsKey(exceptionType).ShouldBeFalse(); + } + + [Fact] + public void record_effective_time() + { + var effectiveTime1 = new RecordEffectiveTime(11.2, "t1"); + var effectiveTime2 = new RecordEffectiveTime(2.1, "t1"); + var effectiveTime3 = new RecordEffectiveTime(3.5, "t1"); + + var tracking = new PerTenantTracking("t1"); + + effectiveTime1.Apply(tracking); + effectiveTime2.Apply(tracking); + effectiveTime3.Apply(tracking); + + tracking.TotalEffectiveTime.ShouldBe(effectiveTime1.Time + effectiveTime2.Time + effectiveTime3.Time); + tracking.Completions.ShouldBe(3); + } + + [Fact] + public void record_execution_time() + { + var execution1 = new RecordExecutionTime(Random.Shared.Next(100, 1000), "t1"); + var execution2 = new RecordExecutionTime(Random.Shared.Next(100, 1000), "t1"); + var execution3 = new RecordExecutionTime(Random.Shared.Next(100, 1000), "t1"); + var execution4 = new RecordExecutionTime(Random.Shared.Next(100, 1000), "t1"); + + var tracking = new PerTenantTracking("t1"); + + execution1.Apply(tracking); + execution2.Apply(tracking); + execution3.Apply(tracking); + execution4.Apply(tracking); + + tracking.Executions.ShouldBe(4); + tracking.TotalExecutionTime.ShouldBe(execution1.Time + execution2.Time + execution3.Time + execution4.Time); + + } + +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/MessageHandlingCountsTests.cs b/src/Testing/MetricsTests/MessageHandlingCountsTests.cs new file mode 100644 index 000000000..1a2af0686 --- /dev/null +++ b/src/Testing/MetricsTests/MessageHandlingCountsTests.cs @@ -0,0 +1,51 @@ +using Shouldly; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class MessageHandlingCountsTests +{ + [Fact] + public void instrument_across_tenants() + { + var counts = new MessageHandlingCounts("m1", new Uri("stub://one")); + + counts.Increment(new RecordExecutionTime(23, "t1")); + counts.Increment(new RecordExecutionTime(45, "t2")); + counts.Increment(new RecordExecutionTime(55, "t1")); + counts.Increment(new RecordExecutionTime(20, "t3")); + counts.Increment(new RecordExecutionTime(117, "t3")); + counts.Increment(new RecordExecutionTime(10, "t2")); + + counts.PerTenant["t1"].Executions.ShouldBe(2); + counts.PerTenant["t1"].TotalExecutionTime.ShouldBe(23 + 55); + + counts.PerTenant["t2"].Executions.ShouldBe(2); + counts.PerTenant["t2"].TotalExecutionTime.ShouldBe(45 + 10); + + counts.PerTenant["t3"].Executions.ShouldBe(2); + counts.PerTenant["t3"].TotalExecutionTime.ShouldBe(20 + 117); + } + + [Fact] + public void clear() + { + var counts = new MessageHandlingCounts("m1", new Uri("stub://one")); + counts.Increment(new RecordExecutionTime(23, "t1")); + counts.Increment(new RecordExecutionTime(45, "t2")); + counts.Increment(new RecordExecutionTime(55, "t1")); + counts.Increment(new RecordExecutionTime(20, "t3")); + counts.Increment(new RecordExecutionTime(117, "t3")); + counts.Increment(new RecordExecutionTime(10, "t2")); + counts.Clear(); + + counts.PerTenant["t1"].Executions.ShouldBe(0); + counts.PerTenant["t1"].TotalExecutionTime.ShouldBe(0); + + counts.PerTenant["t2"].Executions.ShouldBe(0); + counts.PerTenant["t2"].TotalExecutionTime.ShouldBe(0); + + counts.PerTenant["t3"].Executions.ShouldBe(0); + counts.PerTenant["t3"].TotalExecutionTime.ShouldBe(0); + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/MessagePump.cs b/src/Testing/MetricsTests/MessagePump.cs new file mode 100644 index 000000000..9650abb0f --- /dev/null +++ b/src/Testing/MetricsTests/MessagePump.cs @@ -0,0 +1,146 @@ +using System.Collections.Immutable; +using System.Diagnostics; +using Microsoft.Extensions.Hosting; +using Wolverine; +using Wolverine.ErrorHandling; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class MessagePump : IAsyncDisposable +{ + private IHost _host; + + public async Task StartHostAsync(WolverineMetricsMode mode) + { + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.Metrics.Mode = mode; + opts.OnAnyException().RetryTimes(3).Then.MoveToErrorQueue(); + + opts.LocalQueueFor().Sequential(); + }).StartAsync(); + } + + public async Task PumpMessagesAsync(WolverineMetricsMode mode, TimeSpan duration) + { + if (_host == null) + { + await StartHostAsync(mode); + } + + MetricsCollectionHandler.Clear(); + + var stopwatch = new Stopwatch(); + stopwatch.Start(); + + while (stopwatch.ElapsedMilliseconds < duration.TotalMilliseconds) + { + var bus = _host.MessageBus(); + for (int i = 0; i < 100; i++) + { + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M1(Guid.CreateVersion7())); + } + + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M2(Guid.CreateVersion7())); + } + + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M3(Guid.CreateVersion7(), Random.Shared.Next(0, 10))); + } + + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M4(Guid.CreateVersion7(), Random.Shared.Next(0, 10))); + } + + for (int j = 0; j < Random.Shared.Next(1, 5); j++) + { + await bus.PublishAsync(new M5(Guid.CreateVersion7())); + } + + } + } + } + + public async ValueTask DisposeAsync() + { + if (_host is IAsyncDisposable hostAsyncDisposable) + { + await hostAsyncDisposable.DisposeAsync(); + } + else + { + _host.Dispose(); + } + } +} + +public static class MetricsCollectionHandler +{ + public static ImmutableArray Collected { get; private set; } + = ImmutableArray.Empty; + + + public static void Clear() + { + Collected = ImmutableArray.Empty; + } + + public static void Handle(MessageHandlingMetrics metrics) + { + Collected = Collected.Add(metrics); + } +} + +public record M1(Guid Id); +public record M2(Guid Id); + +// Need to retry on errors to see that happen +public record M3(Guid Id, int Fails); +public record M4(Guid Id, int Fails); +public record M5(Guid Id); + +public static class MessagesHandler +{ + public static async Task HandleAsync(M1 m) + { + await Task.Delay(Random.Shared.Next(25, 100)); + } + + public static async Task HandleAsync(M2 m) + { + await Task.Delay(Random.Shared.Next(25, 100)); + } + + public static async Task HandleAsync(M3 m, Envelope envelope) + { + if (m.Fails > envelope.Attempts) + { + throw new BadImageFormatException(); + } + + await Task.Delay(Random.Shared.Next(25, 100)); + } + + public static async Task HandleAsync(M4 m, Envelope envelope) + { + if (m.Fails > envelope.Attempts) + { + throw new DivideByZeroException(); + } + + await Task.Delay(Random.Shared.Next(25, 100)); + } + + public static async Task HandleAsync(M4 m) + { + await Task.Delay(Random.Shared.Next(25, 100)); + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/MetricsTests.csproj b/src/Testing/MetricsTests/MetricsTests.csproj new file mode 100644 index 000000000..f37e99173 --- /dev/null +++ b/src/Testing/MetricsTests/MetricsTests.csproj @@ -0,0 +1,26 @@ + + + + net9.0 + enable + enable + false + + + + + + + + + + + + + + + + + + + diff --git a/src/Testing/MetricsTests/PerTenantTrackingTests.cs b/src/Testing/MetricsTests/PerTenantTrackingTests.cs new file mode 100644 index 000000000..329f83959 --- /dev/null +++ b/src/Testing/MetricsTests/PerTenantTrackingTests.cs @@ -0,0 +1,28 @@ +using Shouldly; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class PerTenantTrackingTests +{ + [Fact] + public void per_tenant_clear() + { + var perTenant = new PerTenantTracking("t1"); + perTenant.Executions++; + perTenant.TotalExecutionTime += 10; + perTenant.DeadLetterCounts["foo"] = 1; + perTenant.Failures["bar"] = 1; + perTenant.Completions++; + perTenant.TotalEffectiveTime = 22.4; + + perTenant.Clear(); + + perTenant.Executions.ShouldBe(0); + perTenant.TotalExecutionTime.ShouldBe(0); + perTenant.DeadLetterCounts.Count.ShouldBe(0); + perTenant.Failures.Count.ShouldBe(0); + perTenant.Completions.ShouldBe(0); + perTenant.TotalEffectiveTime.ShouldBe(0); + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/accumulating_metrics_for_a_single_message_and_destination.cs b/src/Testing/MetricsTests/accumulating_metrics_for_a_single_message_and_destination.cs new file mode 100644 index 000000000..37f2ae0fa --- /dev/null +++ b/src/Testing/MetricsTests/accumulating_metrics_for_a_single_message_and_destination.cs @@ -0,0 +1,63 @@ +using JasperFx; +using Shouldly; +using Wolverine.Runtime.Metrics; + +namespace MetricsTests; + +public class accumulating_metrics_for_a_single_message_and_destination +{ + [Fact] + public async Task pump_in_data_for_single_tenanted() + { + var accumulator = new MessageTypeMetricsAccumulator("m1", new Uri("stub://one")); + + var pump = new InstrumentPump([]); + await pump.Publish(1000, accumulator); + + var tracking = new PerTenantTracking(StorageConstants.DefaultTenantId); + foreach (var data in pump.Data) + { + data.Apply(tracking); + } + + var expected = tracking.CompileAndReset(); + + await accumulator.EntryPoint.WaitForCompletionAsync(); + + var dump = accumulator.TriggerExport(3); + var actual = dump.PerTenant.Single(); + + actual.ShouldMatch(expected); + } + + [Fact] + public async Task pump_in_data_for_multiple_tenants() + { + var accumulator = new MessageTypeMetricsAccumulator("m1", new Uri("stub://one")); + + var pump = new InstrumentPump(["t1", "t2", "t3", "t4"]); + await pump.Publish(1000, "t1", accumulator); + await pump.Publish(1000, "t2", accumulator); + await pump.Publish(1000, "t3", accumulator); + await pump.Publish(1000, "t4", accumulator); + + await accumulator.EntryPoint.WaitForCompletionAsync(); + + var metrics = accumulator.TriggerExport(3); + metrics.PerTenant[0].ShouldMatch(pump.GetExpectedForTenantId("t1")); + metrics.PerTenant[1].ShouldMatch(pump.GetExpectedForTenantId("t2")); + metrics.PerTenant[2].ShouldMatch(pump.GetExpectedForTenantId("t3")); + metrics.PerTenant[3].ShouldMatch(pump.GetExpectedForTenantId("t4")); + } +} + +public static class TestingExtensions +{ + public static void ShouldMatch(this PerTenantMetrics actual, PerTenantMetrics expected) + { + actual.TenantId.ShouldBe(expected.TenantId); + actual.Executions.ShouldBe(expected.Executions); + actual.EffectiveTime.ShouldBe(expected.EffectiveTime); + actual.Executions.ShouldBe(expected.Executions); + } +} \ No newline at end of file diff --git a/src/Testing/MetricsTests/end_to_end_publishing_to_critter_watch.cs b/src/Testing/MetricsTests/end_to_end_publishing_to_critter_watch.cs new file mode 100644 index 000000000..dcadaf0a5 --- /dev/null +++ b/src/Testing/MetricsTests/end_to_end_publishing_to_critter_watch.cs @@ -0,0 +1,65 @@ +using JasperFx.Core; +using Newtonsoft.Json; +using Shouldly; +using Wolverine; +using Xunit.Abstractions; + +namespace MetricsTests; + +public class end_to_end_publishing_to_critter_watch +{ + private readonly ITestOutputHelper _output; + + public end_to_end_publishing_to_critter_watch(ITestOutputHelper output) + { + _output = output; + } + + [Fact] + public async Task run_for_quite_awhile_in_critter_watch_mode() + { + await using var pump = new MessagePump(); + + await pump.PumpMessagesAsync(WolverineMetricsMode.CritterWatch,30.Seconds()); + + var metrics = MetricsCollectionHandler.Collected; + + metrics.Length.ShouldBeGreaterThan(0); + + foreach (var metric in metrics) + { + _output.WriteLine(JsonConvert.SerializeObject(metric, Formatting.Indented)); + + // metric.PerTenant[0].Executions.Count.ShouldBeGreaterThan(0); + // metric.PerTenant[0].Executions.TotalTime.ShouldBeGreaterThan(0); + // metric.PerTenant[0].EffectiveTime.Count.ShouldBeGreaterThan(0); + // metric.PerTenant[0].EffectiveTime.TotalTime.ShouldBeGreaterThan(0); + + + } + } + + [Fact] + public async Task run_for_quite_awhile_in_hybrid_mode() + { + await using var pump = new MessagePump(); + + await pump.PumpMessagesAsync(WolverineMetricsMode.Hybrid,30.Seconds()); + + var metrics = MetricsCollectionHandler.Collected; + + metrics.Length.ShouldBeGreaterThan(0); + + foreach (var metric in metrics) + { + _output.WriteLine(JsonConvert.SerializeObject(metric, Formatting.Indented)); + + // metric.PerTenant[0].Executions.Count.ShouldBeGreaterThan(0); + // metric.PerTenant[0].Executions.TotalTime.ShouldBeGreaterThan(0); + // metric.PerTenant[0].EffectiveTime.Count.ShouldBeGreaterThan(0); + // metric.PerTenant[0].EffectiveTime.TotalTime.ShouldBeGreaterThan(0); + + + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Envelope.Internals.cs b/src/Wolverine/Envelope.Internals.cs index 5c8734ecc..a035fed8e 100644 --- a/src/Wolverine/Envelope.Internals.cs +++ b/src/Wolverine/Envelope.Internals.cs @@ -102,6 +102,11 @@ internal long StopTiming() return _timer.ElapsedMilliseconds; } + /// + /// How long did the current execution take? + /// + internal long ExecutionTime => _timer.ElapsedMilliseconds; + /// /// /// diff --git a/src/Wolverine/Runtime/Handlers/Executor.cs b/src/Wolverine/Runtime/Handlers/Executor.cs index aa4a68d03..9643b5328 100644 --- a/src/Wolverine/Runtime/Handlers/Executor.cs +++ b/src/Wolverine/Runtime/Handlers/Executor.cs @@ -187,7 +187,7 @@ public async Task ExecuteAsync(MessageContext context, Cancellati _tracker.ExecutionFinished(envelope); - return MessageSucceededContinuation.Instance; + return new MessageSucceededContinuation(_tracker); } catch (Exception e) { @@ -271,14 +271,16 @@ public static IExecutor Build(IWolverineRuntime runtime, ObjectPool contextPool, - HandlerGraph handlerGraph, IMessageHandler handler) + HandlerGraph handlerGraph, IMessageHandler handler, IMessageTracker tracker) { var chain = (handler as MessageHandler)?.Chain; var timeoutSpan = chain?.DetermineMessageTimeout(runtime.Options) ?? 5.Seconds(); var rules = chain?.Failures.CombineRules(handlerGraph.Failures) ?? handlerGraph.Failures; - return new Executor(contextPool, runtime, handler, rules, timeoutSpan); + var logger = runtime.LoggerFactory.CreateLogger(handler.MessageType); + + return new Executor(contextPool, logger, handler, tracker, rules, timeoutSpan); } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/IWolverineRuntime.cs b/src/Wolverine/Runtime/IWolverineRuntime.cs index eff6ff6a2..a3bd20ad8 100644 --- a/src/Wolverine/Runtime/IWolverineRuntime.cs +++ b/src/Wolverine/Runtime/IWolverineRuntime.cs @@ -6,6 +6,7 @@ using Wolverine.Persistence.Durability; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Metrics; using Wolverine.Runtime.RemoteInvocation; using Wolverine.Runtime.Routing; @@ -26,6 +27,9 @@ public interface IWolverineRuntime IReplyTracker Replies { get; } IEndpointCollection Endpoints { get; } Meter Meter { get; } + + MetricsAccumulator MetricsAccumulator { get; } + ILoggerFactory LoggerFactory { get; } IAgentRuntime Agents { get; } diff --git a/src/Wolverine/Runtime/MessageSucceededContinuation.cs b/src/Wolverine/Runtime/MessageSucceededContinuation.cs index ab32331db..fa8195b96 100644 --- a/src/Wolverine/Runtime/MessageSucceededContinuation.cs +++ b/src/Wolverine/Runtime/MessageSucceededContinuation.cs @@ -1,6 +1,7 @@ using System.Diagnostics; using Microsoft.Extensions.Logging; using Wolverine.ErrorHandling; +using Wolverine.Logging; namespace Wolverine.Runtime; @@ -8,10 +9,17 @@ public class MessageSucceededContinuation : IContinuation { public static readonly MessageSucceededContinuation Instance = new(); + private IMessageTracker? _tracker; + private MessageSucceededContinuation() { } + public MessageSucceededContinuation(IMessageTracker tracker) + { + _tracker = tracker; + } + public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, IWolverineRuntime runtime, DateTimeOffset now, Activity? activity) @@ -22,7 +30,7 @@ public async ValueTask ExecuteAsync(IEnvelopeLifecycle lifecycle, await lifecycle.CompleteAsync(); - runtime.MessageTracking.MessageSucceeded(lifecycle.Envelope!); + (_tracker ?? runtime.MessageTracking).MessageSucceeded(lifecycle.Envelope!); } catch (Exception ex) { diff --git a/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs b/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs new file mode 100644 index 000000000..f9a37c3bb --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/IHandlerMetricsData.cs @@ -0,0 +1,7 @@ +namespace Wolverine.Runtime.Metrics; + +public interface IHandlerMetricsData +{ + string TenantId { get; } + void Apply(PerTenantTracking tracking); +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs b/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs new file mode 100644 index 000000000..55579f23b --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MessageHandlingCounts.cs @@ -0,0 +1,35 @@ +using JasperFx; +using JasperFx.Core; + +namespace Wolverine.Runtime.Metrics; + + +public class MessageHandlingCounts +{ + public string MessageType { get; } + public Uri Destination { get; } + + public MessageHandlingCounts(string messageType, Uri destination) + { + MessageType = messageType; + Destination = destination; + } + + public LightweightCache PerTenant { get; } = + new(tenantId => new PerTenantTracking(tenantId)); + + public void Increment(IHandlerMetricsData metricsData) + { + var perTenant = PerTenant[metricsData.TenantId ?? StorageConstants.DefaultTenantId]; + metricsData.Apply(perTenant); + } + + public void Clear() + { + foreach (var tracking in PerTenant) + { + tracking.Clear(); + } + + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs b/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs new file mode 100644 index 000000000..b266eaefe --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MessageHandlingMetrics.cs @@ -0,0 +1,18 @@ +using Wolverine.Persistence.Durability.DeadLetterManagement; + +namespace Wolverine.Runtime.Metrics; + +public record MessageHandlingMetrics( + int NodeNumber, + string MessageType, + Uri Destination, + TimeRange Range, + PerTenantMetrics[] PerTenant); + +public record PerTenantMetrics(string TenantId, Executions Executions, EffectiveTime EffectiveTime, ExceptionCounts[] Exceptions); + +public record Executions(int Count, long TotalTime); + +public record EffectiveTime(int Count, double TotalTime); + +public record ExceptionCounts(string ExceptionType, int Failures, int DeadLetters); diff --git a/src/Wolverine/Runtime/Metrics/MessageMetrics.cs b/src/Wolverine/Runtime/Metrics/MessageMetrics.cs new file mode 100644 index 000000000..236565ab7 --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MessageMetrics.cs @@ -0,0 +1,8 @@ +using Wolverine.Persistence.Durability.DeadLetterManagement; + +namespace Wolverine.Runtime.Metrics; + +/// +/// Message type to send remotely +/// +public record MessageMetrics(MessageHandlingMetrics[] Handled); \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs b/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs new file mode 100644 index 000000000..6f7b698bd --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MessageTypeMetricsAccumulator.cs @@ -0,0 +1,66 @@ +using JasperFx.Blocks; +using JasperFx.Core; +using Wolverine.Persistence.Durability.DeadLetterManagement; + +namespace Wolverine.Runtime.Metrics; + +public class MessageTypeMetricsAccumulator +{ + private readonly object _syncLock = new(); + + public string MessageType { get; } + public Uri Destination { get; } + + public MessageTypeMetricsAccumulator(string messageType, Uri destination) + { + MessageType = messageType; + Destination = destination; + + Counts = new MessageHandlingCounts(messageType, destination); + var processor = new Block(Process); + EntryPoint = processor.BatchUpstream(250.Milliseconds(), 500); + } + + public DateTimeOffset Starting { get; private set; } = DateTimeOffset.UtcNow; + + public MessageHandlingCounts Counts { get; } + + public IBlock EntryPoint { get; } + + public void Process(IHandlerMetricsData[] instruments) + { + lock (_syncLock) + { + foreach (var instrument in instruments) + { + try + { + Counts.Increment(instrument); + } + catch (Exception ) + { + // for now + } + } + } + } + + public MessageHandlingMetrics TriggerExport(int nodeNumber) + { + lock (_syncLock) + { + var time = DateTimeOffset.UtcNow; + + var metrics = new MessageHandlingMetrics( + nodeNumber, + MessageType, + Destination, + new TimeRange(Starting, time), + Counts.PerTenant.OrderBy(x => x.TenantId).Select(x => x.CompileAndReset()).ToArray()); + + Starting = time; + + return metrics; + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs b/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs new file mode 100644 index 000000000..79cb7cf2a --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/MetricsAccumulator.cs @@ -0,0 +1,104 @@ +using System.Collections.Immutable; +using JasperFx.Core; +using Microsoft.Extensions.Logging; +using Wolverine.Configuration; + +namespace Wolverine.Runtime.Metrics; + +// TODO -- make this lazy on WolverineRuntime +public class MetricsAccumulator : IAsyncDisposable +{ + private readonly IWolverineRuntime _runtime; + private readonly object _syncLock = new(); + + private ImmutableArray _accumulators = ImmutableArray.Empty; + private Task _runner; + + public MetricsAccumulator(IWolverineRuntime runtime) + { + _runtime = runtime; + } + + public MessageTypeMetricsAccumulator FindAccumulator(string messageTypeName, Endpoint endpoint) + { + var endpointUri = endpoint.Uri; + return FindAccumulator(messageTypeName, endpointUri); + } + + public MessageTypeMetricsAccumulator FindAccumulator(string messageTypeName, Uri endpointUri) + { + var accumulator = + _accumulators.FirstOrDefault(x => x.MessageType == messageTypeName && x.Destination == endpointUri); + + if (accumulator != null) + { + return accumulator; + } + + lock (_syncLock) + { + accumulator = + _accumulators.FirstOrDefault(x => x.MessageType == messageTypeName && x.Destination == endpointUri); + + if (accumulator != null) + { + return accumulator; + } + + accumulator = new MessageTypeMetricsAccumulator(messageTypeName, endpointUri); + _accumulators = _accumulators.Add(accumulator); + } + + return accumulator; + } + + public async ValueTask DrainAsync() + { + var tasks = _accumulators.Select(x => x.EntryPoint.WaitForCompletionAsync()); + await Task.WhenAll(tasks); + } + + public DateTimeOffset From { get; private set; } = DateTimeOffset.UtcNow; + + public void Start() + { + _runner = Task.Run(async () => + { + try + { + while (!_runtime.Cancellation.IsCancellationRequested) + { + await Task.Delay(_runtime.Options.Metrics.SamplingPeriod); + try + { + var bus = new MessageBus(_runtime); + + foreach (var accumulator in _accumulators) + { + var metrics = accumulator.TriggerExport(_runtime.DurabilitySettings.AssignedNodeNumber); + + if (metrics.PerTenant.Length > 0) + { + await bus.PublishAsync(metrics); + } + } + } + catch (Exception e) + { + _runtime.Logger.LogError(e, "Error trying to export message handler metrics"); + } + } + } + catch (OperationCanceledException e) + { + // Nothing + } + }, _runtime.Cancellation); + } + + public ValueTask DisposeAsync() + { + _runner.SafeDispose(); + return new ValueTask(); + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs b/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs new file mode 100644 index 000000000..a15edda3d --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/PerTenantTracking.cs @@ -0,0 +1,55 @@ +namespace Wolverine.Runtime.Metrics; + +public class PerTenantTracking +{ + public string TenantId { get; } + + public PerTenantTracking(string tenantId) + { + TenantId = tenantId; + } + + public int Executions { get; set; } + public long TotalExecutionTime { get; set; } + + public int Completions { get; set; } + public double TotalEffectiveTime { get; set; } + + public Dictionary DeadLetterCounts { get; } = new(); + public Dictionary Failures { get; } = new(); + + public PerTenantMetrics CompileAndReset() + { + var exceptionTypes = DeadLetterCounts.Keys.Union(Failures.Keys).ToArray(); + + var response = new PerTenantMetrics( + TenantId, + new Executions(Executions, TotalExecutionTime), + new EffectiveTime(Completions, TotalEffectiveTime), + exceptionTypes.OrderBy(x => x).Select(exceptionType => + { + int failures = 0; + int deadLetters = 0; + DeadLetterCounts.TryGetValue(exceptionType, out deadLetters); + Failures.TryGetValue(exceptionType, out failures); + + return new ExceptionCounts(exceptionType, failures, deadLetters); + }).ToArray() + ); + + Clear(); + + return response; + } + + public void Clear() + { + Executions = 0; + TotalExecutionTime = 0; + Completions = 0; + TotalEffectiveTime = 0; + DeadLetterCounts.Clear(); + Failures.Clear(); + } + +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs b/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs new file mode 100644 index 000000000..944383fd4 --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/RecordDeadLetter.cs @@ -0,0 +1,12 @@ +namespace Wolverine.Runtime.Metrics; + +public record RecordDeadLetter(string ExceptionType, string TenantId) : IHandlerMetricsData +{ + public void Apply(PerTenantTracking tracking) + { + if (!tracking.DeadLetterCounts.TryAdd(ExceptionType, 1)) + { + tracking.DeadLetterCounts[ExceptionType] += 1; + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs b/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs new file mode 100644 index 000000000..dfda4405d --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/RecordEffectiveTime.cs @@ -0,0 +1,10 @@ +namespace Wolverine.Runtime.Metrics; + +public record RecordEffectiveTime(double Time, string TenantId) : IHandlerMetricsData +{ + public void Apply(PerTenantTracking tracking) + { + tracking.Completions++; + tracking.TotalEffectiveTime += Time; + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs b/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs new file mode 100644 index 000000000..6907c9169 --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/RecordExecutionTime.cs @@ -0,0 +1,10 @@ +namespace Wolverine.Runtime.Metrics; + +public record RecordExecutionTime(long Time, string TenantId) : IHandlerMetricsData +{ + public void Apply(PerTenantTracking tracking) + { + tracking.Executions++; + tracking.TotalExecutionTime += Time; + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Metrics/RecordFailure.cs b/src/Wolverine/Runtime/Metrics/RecordFailure.cs new file mode 100644 index 000000000..a4804c8c3 --- /dev/null +++ b/src/Wolverine/Runtime/Metrics/RecordFailure.cs @@ -0,0 +1,12 @@ +namespace Wolverine.Runtime.Metrics; + +public record RecordFailure(string ExceptionType, string TenantId) : IHandlerMetricsData +{ + public void Apply(PerTenantTracking tracking) + { + if (!tracking.Failures.TryAdd(ExceptionType, 1)) + { + tracking.Failures[ExceptionType] += 1; + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs index dc94206c6..f73e7dc4b 100644 --- a/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs +++ b/src/Wolverine/Runtime/Wolverine.ExecutorFactory.cs @@ -1,6 +1,10 @@ -using Wolverine.Configuration; +using JasperFx.Core.Reflection; +using Wolverine.Configuration; +using Wolverine.Logging; +using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; using Wolverine.Runtime.Partitioning; +using Wolverine.Util; namespace Wolverine.Runtime; @@ -33,10 +37,22 @@ IExecutor IExecutorFactory.BuildFor(Type messageType, Endpoint endpoint) handler = batching.BuildHandler(this); } } + + IMessageTracker tracker = this; + if (!messageType.CanBeCastTo() && Options.Metrics.Mode == WolverineMetricsMode.CritterWatch) + { + var accumulator = MetricsAccumulator.FindAccumulator(messageType.ToMessageTypeName(), endpoint); + tracker = new DirectMetricsPublishingMessageTracker(this, accumulator.EntryPoint); + } + else if (!messageType.CanBeCastTo() && Options.Metrics.Mode == WolverineMetricsMode.Hybrid) + { + var accumulator = MetricsAccumulator.FindAccumulator(messageType.ToMessageTypeName(), endpoint); + tracker = new HybridMetricsPublishingMessageTracker(this, accumulator.EntryPoint); + } var executor = handler == null ? new NoHandlerExecutor(messageType, this) - : Executor.Build(this, ExecutionPool, Handlers, handler); + : Executor.Build(this, ExecutionPool, Handlers, handler, tracker); return executor; } diff --git a/src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs b/src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs new file mode 100644 index 000000000..10f575c09 --- /dev/null +++ b/src/Wolverine/Runtime/WolverineRuntime.DirectMetrics.cs @@ -0,0 +1,118 @@ +using JasperFx.Blocks; +using JasperFx.Core.Reflection; +using Microsoft.Extensions.Logging; +using Wolverine.Logging; +using Wolverine.Runtime.Metrics; +using Wolverine.Tracking; + +namespace Wolverine.Runtime; + +public partial class WolverineRuntime +{ + internal class DirectMetricsPublishingMessageTracker : IMessageTracker + { + private readonly WolverineRuntime _runtime; + private readonly IBlock _sink; + private readonly string _serviceName; + private readonly Guid _uniqueNodeId; + + public DirectMetricsPublishingMessageTracker(WolverineRuntime runtime, IBlock sink) + { + _runtime = runtime; + _sink = sink; + _serviceName = runtime._serviceName; + _uniqueNodeId = runtime._uniqueNodeId; + Logger = _runtime.Logger; + } + + public void LogException(Exception ex, object? correlationId = null, string message = "Exception detected:") + { + _runtime.LogException(ex, correlationId, message); + } + + public ILogger Logger { get; } + + public void Sent(Envelope envelope) + { + // I think we'll have a different mechanism for this + _runtime.ActiveSession?.MaybeRecord(MessageEventType.Sent, envelope, _serviceName, _uniqueNodeId); + _sent(Logger, envelope.CorrelationId!, envelope.GetMessageTypeName(), envelope.Id, + envelope.Destination?.ToString() ?? string.Empty, + null); + } + + public void Received(Envelope envelope) + { + _runtime.ActiveSession?.Record(MessageEventType.Received, envelope, _serviceName, _uniqueNodeId); + _received(Logger, envelope.CorrelationId!, envelope.GetMessageTypeName(), envelope.Id, + envelope.Destination?.ToString() ?? string.Empty, + envelope.ReplyUri?.ToString() ?? string.Empty, null); + } + + public void ExecutionStarted(Envelope envelope) + { + _runtime.ExecutionStarted(envelope); + } + + public void ExecutionFinished(Envelope envelope) + { + var executionTime = envelope.StopTiming(); + if (executionTime > 0) + { + _sink.Post(new RecordExecutionTime(executionTime, envelope.TenantId)); + } + + _runtime.ActiveSession?.Record(MessageEventType.ExecutionFinished, envelope, _serviceName, _uniqueNodeId); + } + + public void ExecutionFinished(Envelope envelope, Exception exception) + { + ExecutionFinished(envelope); + _sink.Post(new RecordFailure(exception.GetType().FullNameInCode(), envelope.TenantId)); + } + + public void MessageSucceeded(Envelope envelope) + { + var time = DateTimeOffset.UtcNow.Subtract(envelope.SentAt.ToUniversalTime()).TotalMilliseconds; + _sink.Post(new RecordEffectiveTime(time, envelope.TenantId)); + + _runtime.ActiveSession?.Record(MessageEventType.MessageSucceeded, envelope, _serviceName, _uniqueNodeId); + } + + public void MessageFailed(Envelope envelope, Exception ex) + { + var time = DateTimeOffset.UtcNow.Subtract(envelope.SentAt.ToUniversalTime()).TotalMilliseconds; + _sink.Post(new RecordEffectiveTime(time, envelope.TenantId)); + _sink.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + + _runtime.ActiveSession?.Record(MessageEventType.Sent, envelope, _serviceName, _uniqueNodeId, ex); + } + + public void NoHandlerFor(Envelope envelope) + { + _runtime.NoHandlerFor(envelope); + } + + public void NoRoutesFor(Envelope envelope) + { + _runtime.NoRoutesFor(envelope); + } + + public void MovedToErrorQueue(Envelope envelope, Exception ex) + { + _runtime.ActiveSession?.Record(MessageEventType.MovedToErrorQueue, envelope, _serviceName, _uniqueNodeId); + _movedToErrorQueue(Logger, envelope, ex); + _sink.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + } + + public void DiscardedEnvelope(Envelope envelope) + { + _undeliverable(Logger, envelope, null); + } + + public void Requeued(Envelope envelope) + { + _runtime.Requeued(envelope); + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.Disposal.cs b/src/Wolverine/Runtime/WolverineRuntime.Disposal.cs index f70a773bc..85a5c1095 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Disposal.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Disposal.cs @@ -33,5 +33,10 @@ async ValueTask IAsyncDisposable.DisposeAsync() { await definition.As().DisposeAsync(); } + + if (_accumulator.IsValueCreated) + { + await _accumulator.Value.DisposeAsync(); + } } } \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs index 6a1df99ea..bd37c3f3b 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.HostService.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.HostService.cs @@ -175,7 +175,7 @@ public async Task StopAsync(CancellationToken cancellationToken) return; } - _agentCancellation.Cancel(); + await _agentCancellation.CancelAsync(); _hasStopped = true; @@ -210,6 +210,11 @@ public async Task StopAsync(CancellationToken cancellationToken) await teardownAgentsAsync(); await _endpoints.DrainAsync(); + + if (_accumulator.IsValueCreated) + { + await _accumulator.Value.DrainAsync(); + } } DurabilitySettings.Cancel(); @@ -235,6 +240,12 @@ private void startInMemoryScheduledJobs() private async Task startMessagingTransportsAsync() { + // Start up metrics collection + if (Options.Metrics.Mode != WolverineMetricsMode.SystemDiagnosticsMeter) + { + _accumulator.Value.Start(); + } + discoverListenersFromConventions(); // No local queues if running in Serverless diff --git a/src/Wolverine/Runtime/WolverineRuntime.HybridMetricsPublishingMessageTracker.cs b/src/Wolverine/Runtime/WolverineRuntime.HybridMetricsPublishingMessageTracker.cs new file mode 100644 index 000000000..d380dbea7 --- /dev/null +++ b/src/Wolverine/Runtime/WolverineRuntime.HybridMetricsPublishingMessageTracker.cs @@ -0,0 +1,101 @@ +using JasperFx.Blocks; +using JasperFx.Core.Reflection; +using Wolverine.Logging; +using Wolverine.Runtime.Metrics; + +namespace Wolverine.Runtime; + +public partial class WolverineRuntime +{ + internal class HybridMetricsPublishingMessageTracker : IMessageTracker + { + private readonly WolverineRuntime _runtime; + private readonly IBlock _sink; + + public HybridMetricsPublishingMessageTracker(WolverineRuntime runtime, IBlock sink) + { + _runtime = runtime; + _sink = sink; + } + + public void LogException(Exception ex, object? correlationId = null, string message = "Exception detected:") + { + _runtime.LogException(ex, correlationId, message); + } + + public void Sent(Envelope envelope) + { + _runtime.Sent(envelope); + } + + public void Received(Envelope envelope) + { + _runtime.Received(envelope); + } + + public void ExecutionStarted(Envelope envelope) + { + _runtime.ExecutionStarted(envelope); + } + + public void ExecutionFinished(Envelope envelope) + { + var executionTime = envelope.StopTiming(); + if (executionTime > 0) + { + _sink.Post(new RecordExecutionTime(executionTime, envelope.TenantId)); + } + + _runtime.ExecutionFinished(envelope); + } + + public void ExecutionFinished(Envelope envelope, Exception exception) + { + _runtime.ExecutionFinished(envelope, exception); + _sink.Post(new RecordFailure(exception.GetType().FullNameInCode(), envelope.TenantId)); + } + + public void MessageSucceeded(Envelope envelope) + { + var time = DateTimeOffset.UtcNow.Subtract(envelope.SentAt.ToUniversalTime()).TotalMilliseconds; + _sink.Post(new RecordEffectiveTime(time, envelope.TenantId)); + + _runtime.MessageSucceeded(envelope); + } + + public void MessageFailed(Envelope envelope, Exception ex) + { + var time = DateTimeOffset.UtcNow.Subtract(envelope.SentAt.ToUniversalTime()).TotalMilliseconds; + _sink.Post(new RecordEffectiveTime(time, envelope.TenantId)); + _sink.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + + _runtime.MessageFailed(envelope, ex); + } + + public void NoHandlerFor(Envelope envelope) + { + _runtime.NoHandlerFor(envelope); + } + + public void NoRoutesFor(Envelope envelope) + { + _runtime.NoRoutesFor(envelope); + } + + public void MovedToErrorQueue(Envelope envelope, Exception ex) + { + _runtime.MovedToErrorQueue(envelope, ex); + _sink.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + } + + public void DiscardedEnvelope(Envelope envelope) + { + _runtime.DiscardedEnvelope(envelope); + } + + public void Requeued(Envelope envelope) + { + _runtime.Requeued(envelope); + } + } +} \ No newline at end of file diff --git a/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs b/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs index 7fcbfdeb2..34d1fb32e 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.Tracking.cs @@ -1,6 +1,9 @@ using System.Diagnostics.Metrics; +using JasperFx.Core; +using JasperFx.Core.Reflection; using Microsoft.Extensions.Logging; using Wolverine.Logging; +using Wolverine.Runtime.Metrics; using Wolverine.Tracking; namespace Wolverine.Runtime; @@ -132,6 +135,12 @@ public void MovedToErrorQueue(Envelope envelope, Exception ex) { ActiveSession?.Record(MessageEventType.MovedToErrorQueue, envelope, _serviceName, _uniqueNodeId); _movedToErrorQueue(Logger, envelope, ex); + + if (Options.Metrics.Mode != WolverineMetricsMode.SystemDiagnosticsMeter && envelope.MessageType.IsNotEmpty()) + { + var accumulator = _accumulator.Value.FindAccumulator(envelope.MessageType, envelope.Destination); + accumulator.EntryPoint.Post(new RecordDeadLetter(ex.GetType().FullNameInCode(), envelope.TenantId)); + } } public void DiscardedEnvelope(Envelope envelope) diff --git a/src/Wolverine/Runtime/WolverineRuntime.cs b/src/Wolverine/Runtime/WolverineRuntime.cs index c25c542c5..83a406517 100644 --- a/src/Wolverine/Runtime/WolverineRuntime.cs +++ b/src/Wolverine/Runtime/WolverineRuntime.cs @@ -13,6 +13,7 @@ using Wolverine.Persistence.Durability; using Wolverine.Runtime.Agents; using Wolverine.Runtime.Handlers; +using Wolverine.Runtime.Metrics; using Wolverine.Runtime.RemoteInvocation; using Wolverine.Runtime.Routing; using Wolverine.Runtime.Scheduled; @@ -25,13 +26,14 @@ public sealed partial class WolverineRuntime : IWolverineRuntime, IHostedService private readonly EndpointCollection _endpoints; private readonly LightweightCache _invokers; - private readonly string _serviceName; + private string _serviceName; private readonly Guid _uniqueNodeId; private ImHashMap _extensions = ImHashMap.Empty; private bool _hasStopped; private readonly Lazy _stores; + private readonly Lazy _accumulator; public WolverineRuntime(WolverineOptions options, IServiceContainer container, @@ -41,6 +43,8 @@ public WolverineRuntime(WolverineOptions options, Options = options; Handlers = options.HandlerGraph; + _accumulator = new Lazy(() => new MetricsAccumulator(this)); + _stores = new Lazy(() => container.Services.GetRequiredService()); @@ -100,6 +104,8 @@ public WolverineRuntime(WolverineOptions options, } } + public MetricsAccumulator MetricsAccumulator => _accumulator.Value; + public IWolverineObserver Observer { get; set; } public IServiceProvider Services => _container.Services; diff --git a/src/Wolverine/Util/Dataflow/BatchingBlock.cs b/src/Wolverine/Util/Dataflow/BatchingBlock.cs deleted file mode 100644 index b66274acd..000000000 --- a/src/Wolverine/Util/Dataflow/BatchingBlock.cs +++ /dev/null @@ -1,87 +0,0 @@ -using System.Threading.Tasks.Dataflow; -using JasperFx.Core; - -namespace Wolverine.Util.Dataflow; - -public class BatchingBlock : IDisposable -{ - private readonly BatchBlock _batchBlock; - private readonly TimeSpan _timeSpan; - private readonly Timer _trigger; - - public BatchingBlock(int milliseconds, ITargetBlock processor, - CancellationToken cancellation = default) - : this(milliseconds.Milliseconds(), processor, 100, cancellation) - { - } - - public BatchingBlock(TimeSpan timeSpan, ITargetBlock processor, int batchSize = 100, - CancellationToken cancellation = default) - { - _timeSpan = timeSpan; - _batchBlock = new BatchBlock(batchSize, new GroupingDataflowBlockOptions - { - CancellationToken = cancellation, - BoundedCapacity = DataflowBlockOptions.Unbounded - }); - - _trigger = new Timer(_ => - { - try - { - _batchBlock.TriggerBatch(); - } - catch (Exception) - { - // ignored - } - }, null, Timeout.Infinite, Timeout.Infinite); - - - _batchBlock.LinkTo(processor); - } - - public int ItemCount => _batchBlock.OutputCount; - - public Task Completion => _batchBlock.Completion; - - - public void Dispose() - { - _trigger.Dispose(); - _batchBlock.Complete(); - } - - public void Send(T item) - { - try - { - _trigger.Change(_timeSpan, Timeout.InfiniteTimeSpan); - } - catch (Exception) - { - // ignored - } - - _batchBlock.Post(item); - } - - public Task SendAsync(T item) - { - try - { - _trigger.Change(_timeSpan, Timeout.InfiniteTimeSpan); - } - catch (Exception) - { - // ignored - } - - return _batchBlock.SendAsync(item); - } - - public void Complete() - { - _batchBlock.Complete(); - } -} \ No newline at end of file diff --git a/src/Wolverine/WolverineOptions.cs b/src/Wolverine/WolverineOptions.cs index cbba36eda..53cc1d8be 100644 --- a/src/Wolverine/WolverineOptions.cs +++ b/src/Wolverine/WolverineOptions.cs @@ -36,6 +36,42 @@ public enum MultipleHandlerBehavior Separated } +public enum WolverineMetricsMode +{ + /// + /// Wolverine will publish performance metrics via System.Diagnostics.Meter + /// where any Otel tooling can be configured to scrape metrics + /// + SystemDiagnosticsMeter, + + /// + /// Wolverine will accumulate and occasionally publish performance metrics + /// via messaging to subscribers configured to listen for Wolverine performance + /// data + /// + CritterWatch, + + /// + /// Wolverine will both accumulate and publish metrics information to CritterWatch + /// *and* publish metrics via System.Diagnostics.Meter + /// + Hybrid +} + +public class MetricsOptions +{ + /// + /// How should Wolverine collect and publish metrics about message handling and publications? + /// + public WolverineMetricsMode Mode { get; set; } = WolverineMetricsMode.SystemDiagnosticsMeter; + + /// + /// If using either CritterWatch or Hybrid metrics publishing, this is the period in which + /// Wolverine will sample and publish metric data collection. Default is 5 seconds + /// + public TimeSpan SamplingPeriod { get; set; } = 5.Seconds(); +} + /// /// Completely defines and configures a Wolverine application /// @@ -46,6 +82,7 @@ public sealed partial class WolverineOptions public WolverineOptions() : this(null) { + } public WolverineOptions(string? assemblyName) @@ -82,6 +119,8 @@ public WolverineOptions(string? assemblyName) MessagePartitioning = new MessagePartitioningRules(this); } + public MetricsOptions Metrics { get; } = new(); + /// /// What is the policy within this application for whether or not it is valid to allow Service Location within /// the generated code for message handlers or HTTP endpoints. Default is AllowedByWarn. Just keep in mind that diff --git a/wolverine.sln b/wolverine.sln index 980483151..0aa833226 100644 --- a/wolverine.sln +++ b/wolverine.sln @@ -297,6 +297,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wolverine.SignalR.Tests", " EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WolverineChat", "src\Samples\WolverineChat\WolverineChat.csproj", "{08C40523-A236-4127-8400-2DBCAB3E98CF}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MetricsTests", "src\Testing\MetricsTests\MetricsTests.csproj", "{501C5BD8-41D5-4032-85C7-33F8815EA60D}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -1663,6 +1665,18 @@ Global {08C40523-A236-4127-8400-2DBCAB3E98CF}.Release|x64.Build.0 = Release|Any CPU {08C40523-A236-4127-8400-2DBCAB3E98CF}.Release|x86.ActiveCfg = Release|Any CPU {08C40523-A236-4127-8400-2DBCAB3E98CF}.Release|x86.Build.0 = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|x64.ActiveCfg = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|x64.Build.0 = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|x86.ActiveCfg = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Debug|x86.Build.0 = Debug|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|Any CPU.Build.0 = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|x64.ActiveCfg = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|x64.Build.0 = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|x86.ActiveCfg = Release|Any CPU + {501C5BD8-41D5-4032-85C7-33F8815EA60D}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -1800,5 +1814,6 @@ Global {36645C4B-BE1F-4184-A14C-D5BFA3F86A86} = {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} {3F62DB30-9A29-487A-9EE2-22A097E3EE3F} = {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} {08C40523-A236-4127-8400-2DBCAB3E98CF} = {1EF7D49F-DDB8-469A-88A0-4A8D6237561C} + {501C5BD8-41D5-4032-85C7-33F8815EA60D} = {96119B5E-B5F0-400A-9580-B342EBE26212} EndGlobalSection EndGlobal