diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index d2ef2b989..ee6d2d6ad 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -131,6 +131,7 @@ const config: UserConfig = { {text: 'Configuration', link: '/guide/configuration'}, {text: 'Runtime Architecture', link: '/guide/runtime'}, {text: 'Message Encryption', link: '/guide/runtime/encryption'}, + {text: 'Heartbeats', link: '/guide/runtime/heartbeats'}, {text: 'Instrumentation and Metrics', link: '/guide/logging'}, {text: 'Diagnostics', link: '/guide/diagnostics'}, {text: 'Serverless Hosting', link: '/guide/serverless'}, diff --git a/docs/guide/runtime/heartbeats.md b/docs/guide/runtime/heartbeats.md new file mode 100644 index 000000000..67857c808 --- /dev/null +++ b/docs/guide/runtime/heartbeats.md @@ -0,0 +1,66 @@ +# Heartbeats + +Wolverine can periodically emit a `WolverineHeartbeat` message from each running +node so that external monitoring tools (for example +[CritterWatch](https://github.com/JasperFx/CritterWatch)) can detect when a node +goes dark. Heartbeats are off by default and are opted-in through +`EnableHeartbeats`. + +## Quickstart + +```csharp +using Wolverine; + +await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + // Enable with the default 30-second cadence + opts.EnableHeartbeats(); + + // ...or override the interval + // opts.EnableHeartbeats(TimeSpan.FromSeconds(10)); + + // Route heartbeats wherever the dashboard listens. Without a publish + // rule the heartbeat is local-only and does nothing if no in-process + // handler subscribes. + opts.PublishMessage() + .ToRabbitExchange("monitoring"); + }) + .RunOasisAsync(); +``` + +## What gets sent + +Each heartbeat carries the bare minimum a monitor needs to attribute it back to +a node: + +| Field | Source | +| ------------- | ---------------------------------------------------- | +| `ServiceName` | `WolverineOptions.ServiceName` | +| `NodeNumber` | `WolverineOptions.Durability.AssignedNodeNumber` | +| `SentAt` | UTC timestamp captured at publish | +| `Uptime` | Elapsed time since the heartbeat service started | + +The publish goes through Wolverine's normal routing pipeline — apply +`PublishMessage`, `Publish().To*`, or any other publish rule the same way you +would for any application event. + +## Configuration + +`HeartbeatPolicy` lives at `WolverineOptions.Heartbeat`: + +```csharp +opts.Heartbeat.Enabled = false; // disable without removing registration +opts.Heartbeat.Interval = 5.Seconds(); // override the cadence +``` + +`Enabled = false` causes the hosted service to exit at startup, which is the +recommended way to suppress heartbeats per environment (e.g. local development) +without altering the registration. + +## Where this fits + +Heartbeats answer "is this node still alive?" but say nothing about whether +listeners or transports are healthy. Pair them with the existing +[durability and node health](../durability/leadership-and-troubleshooting.md) +features for a fuller monitoring story. diff --git a/src/Testing/CoreTests/Runtime/Heartbeat/HeartbeatBackgroundServiceTests.cs b/src/Testing/CoreTests/Runtime/Heartbeat/HeartbeatBackgroundServiceTests.cs new file mode 100644 index 000000000..1eac2daaa --- /dev/null +++ b/src/Testing/CoreTests/Runtime/Heartbeat/HeartbeatBackgroundServiceTests.cs @@ -0,0 +1,140 @@ +using JasperFx.Core; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using Wolverine; +using Wolverine.Logging; +using Wolverine.Persistence.Durability; +using Wolverine.Runtime; +using Wolverine.Runtime.Heartbeat; +using Wolverine.Runtime.Routing; +using Xunit; + +namespace CoreTests.Runtime.Heartbeat; + +public class HeartbeatBackgroundServiceTests +{ + private static IWolverineRuntime BuildRuntime(WolverineOptions options, IMessageRouter router) + { + var runtime = Substitute.For(); + runtime.Options.Returns(options); + runtime.MessageTracking.Returns(Substitute.For()); + runtime.Storage.Returns(Substitute.For()); + runtime.Logger.Returns(NullLogger.Instance); + runtime.RoutingFor(typeof(WolverineHeartbeat)).Returns(router); + return runtime; + } + + private static (WolverineOptions options, IMessageRouter router) BuildPublishingOptions(string serviceName) + { + var options = new WolverineOptions { ServiceName = serviceName }; + // Empty envelope array so MessageBus.PublishAsync short-circuits and we don't + // need to wire up a real persistence/sending pipeline. + var router = Substitute.For(); + router.RouteForPublish(Arg.Any(), Arg.Any()) + .Returns(Array.Empty()); + return (options, router); + } + + [Fact] + public async Task publishes_repeatedly_at_the_configured_interval() + { + var (options, router) = BuildPublishingOptions("HeartbeatService"); + options.EnableHeartbeats(50.Milliseconds()); + + var runtime = BuildRuntime(options, router); + var service = new HeartbeatBackgroundService(runtime); + + using var cts = new CancellationTokenSource(); + var execution = service.StartAsync(cts.Token); + + // Run the service for 250ms with a 50ms interval — expect at least 2 publishes + await Task.Delay(250); + await cts.CancelAsync(); + await service.StopAsync(CancellationToken.None); + + var publishCount = router.ReceivedCalls() + .Count(c => c.GetMethodInfo().Name == nameof(IMessageRouter.RouteForPublish)); + + publishCount.ShouldBeGreaterThanOrEqualTo(2); + } + + [Fact] + public async Task heartbeat_carries_service_name_and_node_number() + { + var (options, router) = BuildPublishingOptions("CritterFleet"); + options.EnableHeartbeats(20.Milliseconds()); + options.Durability.AssignedNodeNumber = 4242; + + var runtime = BuildRuntime(options, router); + var service = new HeartbeatBackgroundService(runtime); + + using var cts = new CancellationTokenSource(); + await service.StartAsync(cts.Token); + + // Wait long enough for at least one publish + await Task.Delay(120); + await cts.CancelAsync(); + await service.StopAsync(CancellationToken.None); + + var heartbeats = router.ReceivedCalls() + .Where(c => c.GetMethodInfo().Name == nameof(IMessageRouter.RouteForPublish)) + .Select(c => c.GetArguments()[0]) + .OfType() + .ToList(); + + heartbeats.ShouldNotBeEmpty(); + var first = heartbeats[0]; + first.ServiceName.ShouldBe("CritterFleet"); + first.NodeNumber.ShouldBe(4242); + first.Uptime.ShouldBeGreaterThanOrEqualTo(TimeSpan.Zero); + } + + [Fact] + public async Task does_not_publish_when_disabled() + { + var (options, router) = BuildPublishingOptions("Disabled"); + options.EnableHeartbeats(20.Milliseconds()); + options.Heartbeat.Enabled = false; + + var runtime = BuildRuntime(options, router); + var service = new HeartbeatBackgroundService(runtime); + + using var cts = new CancellationTokenSource(); + await service.StartAsync(cts.Token); + + await Task.Delay(120); + await cts.CancelAsync(); + await service.StopAsync(CancellationToken.None); + + var publishCount = router.ReceivedCalls() + .Count(c => c.GetMethodInfo().Name == nameof(IMessageRouter.RouteForPublish)); + + publishCount.ShouldBe(0); + } + + [Fact] + public void enable_heartbeats_extension_sets_policy_and_registers_hosted_service() + { + var options = new WolverineOptions(); + + options.EnableHeartbeats(7.Seconds()); + + options.Heartbeat.Enabled.ShouldBeTrue(); + options.Heartbeat.Interval.ShouldBe(7.Seconds()); + + options.Services.ShouldContain(d => + d.ServiceType == typeof(HeartbeatBackgroundService)); + } + + [Fact] + public void enable_heartbeats_without_interval_keeps_default() + { + var options = new WolverineOptions(); + var defaultInterval = options.Heartbeat.Interval; + + options.EnableHeartbeats(); + + options.Heartbeat.Enabled.ShouldBeTrue(); + options.Heartbeat.Interval.ShouldBe(defaultInterval); + } +} diff --git a/src/Wolverine/HeartbeatPolicy.cs b/src/Wolverine/HeartbeatPolicy.cs new file mode 100644 index 000000000..44d4c0a3b --- /dev/null +++ b/src/Wolverine/HeartbeatPolicy.cs @@ -0,0 +1,31 @@ +namespace Wolverine; + +/// +/// Configures the periodic +/// emission performed by . +/// Heartbeats are intended to give external monitoring tools (e.g. CritterWatch) a simple +/// liveness signal: each running node publishes a tiny message at a regular cadence, and +/// missing heartbeats indicate that a node has gone dark. +/// +/// +/// The hosted service that emits heartbeats is registered through +/// . Setting to +/// false after registration will cause the background service to exit immediately on +/// startup so heartbeats can be selectively disabled per environment without removing the +/// registration. +/// +public class HeartbeatPolicy +{ + /// + /// Whether heartbeat emission is enabled. Defaults to true. The hosted service + /// only runs when has been + /// invoked; this flag controls whether that registered service actually publishes. + /// + public bool Enabled { get; set; } = true; + + /// + /// Interval between successive + /// publishes. Defaults to 30 seconds. + /// + public TimeSpan Interval { get; set; } = TimeSpan.FromSeconds(30); +} diff --git a/src/Wolverine/Runtime/Heartbeat/HeartbeatBackgroundService.cs b/src/Wolverine/Runtime/Heartbeat/HeartbeatBackgroundService.cs new file mode 100644 index 000000000..91a39520f --- /dev/null +++ b/src/Wolverine/Runtime/Heartbeat/HeartbeatBackgroundService.cs @@ -0,0 +1,78 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Wolverine.Runtime.Heartbeat; + +/// +/// Hosted service that publishes a on a regular cadence +/// dictated by . Registered through +/// . +/// +/// +/// The service obtains its publish path from a freshly-constructed +/// over the supplied , so heartbeats traverse the normal +/// Wolverine routing pipeline. If is false, +/// returns immediately without scheduling any work. +/// +public class HeartbeatBackgroundService : BackgroundService +{ + private readonly IWolverineRuntime _runtime; + private readonly DateTimeOffset _startedAt; + + public HeartbeatBackgroundService(IWolverineRuntime runtime) + { + _runtime = runtime; + _startedAt = DateTimeOffset.UtcNow; + } + + /// + /// For tests: time source override, defaults to DateTimeOffset.UtcNow. + /// + internal Func Now { get; set; } = () => DateTimeOffset.UtcNow; + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var policy = _runtime.Options.Heartbeat; + if (!policy.Enabled) + { + return; + } + + var bus = new MessageBus(_runtime); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + await Task.Delay(policy.Interval, stoppingToken).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } + + if (stoppingToken.IsCancellationRequested) return; + + try + { + var now = Now(); + var heartbeat = new WolverineHeartbeat( + _runtime.Options.ServiceName ?? string.Empty, + _runtime.Options.Durability.AssignedNodeNumber, + now, + now - _startedAt); + + await bus.PublishAsync(heartbeat).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + return; + } + catch (Exception e) + { + // Heartbeat failure must never crash the host. Log and continue. + _runtime.Logger.LogWarning(e, "Failed to publish WolverineHeartbeat"); + } + } + } +} diff --git a/src/Wolverine/Runtime/Heartbeat/WolverineHeartbeat.cs b/src/Wolverine/Runtime/Heartbeat/WolverineHeartbeat.cs new file mode 100644 index 000000000..8b787cc33 --- /dev/null +++ b/src/Wolverine/Runtime/Heartbeat/WolverineHeartbeat.cs @@ -0,0 +1,24 @@ +namespace Wolverine.Runtime.Heartbeat; + +/// +/// Periodic liveness signal emitted by when +/// heartbeats are enabled via . +/// External monitoring tools (e.g. CritterWatch) subscribe to +/// and infer node health from the cadence at which heartbeats arrive. +/// +/// +/// Heartbeats are routed through Wolverine's normal publish pipeline, which means consumers +/// must register an explicit publish rule (for example +/// opts.PublishMessage<WolverineHeartbeat>().ToRabbitExchange("monitoring")) for +/// the heartbeats to leave the local node. With no publish rule and no local subscriber the +/// publish is effectively a no-op. +/// +/// Logical service name from WolverineOptions.ServiceName. +/// Locally-assigned node number from Durability.AssignedNodeNumber. +/// UTC timestamp captured when the heartbeat was published. +/// Elapsed time since the heartbeat background service started. +public record WolverineHeartbeat( + string ServiceName, + int NodeNumber, + DateTimeOffset SentAt, + TimeSpan Uptime); diff --git a/src/Wolverine/WolverineOptions.cs b/src/Wolverine/WolverineOptions.cs index a65bc9b0d..83c802292 100644 --- a/src/Wolverine/WolverineOptions.cs +++ b/src/Wolverine/WolverineOptions.cs @@ -349,6 +349,15 @@ public void RegisterMessageType(Type messageType) [ChildDescription] public DurabilitySettings Durability { get; } + /// + /// Configuration for the periodic + /// emission used by external monitoring tools to detect node liveness. The hosted service + /// that emits heartbeats is registered through + /// . + /// + [ChildDescription] + public HeartbeatPolicy Heartbeat { get; } = new(); + /// /// The default message execution timeout for local queues. This uses a CancellationTokenSource /// behind the scenes, and the timeout enforcement is dependent on the usage within handlers diff --git a/src/Wolverine/WolverineOptionsExtensions.cs b/src/Wolverine/WolverineOptionsExtensions.cs new file mode 100644 index 000000000..8b27d8b2c --- /dev/null +++ b/src/Wolverine/WolverineOptionsExtensions.cs @@ -0,0 +1,41 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Wolverine.Runtime.Heartbeat; + +namespace Wolverine; + +/// +/// Extension methods that opt a instance into ancillary +/// runtime features without modifying the core options class. +/// +public static class WolverineOptionsExtensions +{ + /// + /// Enable periodic emission from this Wolverine node. + /// Heartbeats are routed through the normal publish pipeline, so callers must register + /// a publish rule (e.g. opts.PublishMessage<WolverineHeartbeat>().ToRabbitExchange("monitoring")) + /// to deliver them to an external monitoring tool such as CritterWatch. With no rule + /// configured the heartbeat is a local-only no-op unless a local subscriber exists. + /// + /// The being configured. + /// + /// Optional override for the heartbeat cadence. When null the existing + /// value (default 30 seconds) is preserved. + /// + /// The same instance for fluent chaining. + public static WolverineOptions EnableHeartbeats(this WolverineOptions opts, TimeSpan? interval = null) + { + ArgumentNullException.ThrowIfNull(opts); + + opts.Heartbeat.Enabled = true; + if (interval.HasValue) + { + opts.Heartbeat.Interval = interval.Value; + } + + opts.Services.AddSingleton(); + opts.Services.AddHostedService(sp => sp.GetRequiredService()); + + return opts; + } +}