Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ const config: UserConfig<DefaultTheme.Config> = {
{text: 'Configuration', link: '/guide/configuration'},
{text: 'Runtime Architecture', link: '/guide/runtime'},
{text: 'Message Encryption', link: '/guide/runtime/encryption'},
{text: 'Heartbeats', link: '/guide/runtime/heartbeats'},
{text: 'Instrumentation and Metrics', link: '/guide/logging'},
{text: 'Diagnostics', link: '/guide/diagnostics'},
{text: 'Serverless Hosting', link: '/guide/serverless'},
Expand Down
66 changes: 66 additions & 0 deletions docs/guide/runtime/heartbeats.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Heartbeats

Wolverine can periodically emit a `WolverineHeartbeat` message from each running
node so that external monitoring tools (for example
[CritterWatch](https://github.com/JasperFx/CritterWatch)) can detect when a node
goes dark. Heartbeats are off by default and are opted-in through
`EnableHeartbeats`.

## Quickstart

```csharp
using Wolverine;

await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
// Enable with the default 30-second cadence
opts.EnableHeartbeats();

// ...or override the interval
// opts.EnableHeartbeats(TimeSpan.FromSeconds(10));

// Route heartbeats wherever the dashboard listens. Without a publish
// rule the heartbeat is local-only and does nothing if no in-process
// handler subscribes.
opts.PublishMessage<Wolverine.Runtime.Heartbeat.WolverineHeartbeat>()
.ToRabbitExchange("monitoring");
})
.RunOasisAsync();
```

## What gets sent

Each heartbeat carries the bare minimum a monitor needs to attribute it back to
a node:

| Field | Source |
| ------------- | ---------------------------------------------------- |
| `ServiceName` | `WolverineOptions.ServiceName` |
| `NodeNumber` | `WolverineOptions.Durability.AssignedNodeNumber` |
| `SentAt` | UTC timestamp captured at publish |
| `Uptime` | Elapsed time since the heartbeat service started |

The publish goes through Wolverine's normal routing pipeline — apply
`PublishMessage`, `Publish().To*`, or any other publish rule the same way you
would for any application event.

## Configuration

`HeartbeatPolicy` lives at `WolverineOptions.Heartbeat`:

```csharp
opts.Heartbeat.Enabled = false; // disable without removing registration
opts.Heartbeat.Interval = 5.Seconds(); // override the cadence
```

`Enabled = false` causes the hosted service to exit at startup, which is the
recommended way to suppress heartbeats per environment (e.g. local development)
without altering the registration.

## Where this fits

Heartbeats answer "is this node still alive?" but say nothing about whether
listeners or transports are healthy. Pair them with the existing
[durability and node health](../durability/leadership-and-troubleshooting.md)
features for a fuller monitoring story.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
using JasperFx.Core;
using Microsoft.Extensions.Logging.Abstractions;
using NSubstitute;
using Wolverine;
using Wolverine.Logging;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Runtime.Heartbeat;
using Wolverine.Runtime.Routing;
using Xunit;

namespace CoreTests.Runtime.Heartbeat;

public class HeartbeatBackgroundServiceTests
{
private static IWolverineRuntime BuildRuntime(WolverineOptions options, IMessageRouter router)
{
var runtime = Substitute.For<IWolverineRuntime>();
runtime.Options.Returns(options);
runtime.MessageTracking.Returns(Substitute.For<IMessageTracker>());
runtime.Storage.Returns(Substitute.For<IMessageStore>());
runtime.Logger.Returns(NullLogger.Instance);
runtime.RoutingFor(typeof(WolverineHeartbeat)).Returns(router);
return runtime;
}

private static (WolverineOptions options, IMessageRouter router) BuildPublishingOptions(string serviceName)
{
var options = new WolverineOptions { ServiceName = serviceName };
// Empty envelope array so MessageBus.PublishAsync short-circuits and we don't
// need to wire up a real persistence/sending pipeline.
var router = Substitute.For<IMessageRouter>();
router.RouteForPublish(Arg.Any<object>(), Arg.Any<DeliveryOptions?>())
.Returns(Array.Empty<Envelope>());
return (options, router);
}

[Fact]
public async Task publishes_repeatedly_at_the_configured_interval()
{
var (options, router) = BuildPublishingOptions("HeartbeatService");
options.EnableHeartbeats(50.Milliseconds());

var runtime = BuildRuntime(options, router);
var service = new HeartbeatBackgroundService(runtime);

using var cts = new CancellationTokenSource();
var execution = service.StartAsync(cts.Token);

// Run the service for 250ms with a 50ms interval — expect at least 2 publishes
await Task.Delay(250);
await cts.CancelAsync();
await service.StopAsync(CancellationToken.None);

var publishCount = router.ReceivedCalls()
.Count(c => c.GetMethodInfo().Name == nameof(IMessageRouter.RouteForPublish));

publishCount.ShouldBeGreaterThanOrEqualTo(2);
}

[Fact]
public async Task heartbeat_carries_service_name_and_node_number()
{
var (options, router) = BuildPublishingOptions("CritterFleet");
options.EnableHeartbeats(20.Milliseconds());
options.Durability.AssignedNodeNumber = 4242;

var runtime = BuildRuntime(options, router);
var service = new HeartbeatBackgroundService(runtime);

using var cts = new CancellationTokenSource();
await service.StartAsync(cts.Token);

// Wait long enough for at least one publish
await Task.Delay(120);
await cts.CancelAsync();
await service.StopAsync(CancellationToken.None);

var heartbeats = router.ReceivedCalls()
.Where(c => c.GetMethodInfo().Name == nameof(IMessageRouter.RouteForPublish))
.Select(c => c.GetArguments()[0])
.OfType<WolverineHeartbeat>()
.ToList();

heartbeats.ShouldNotBeEmpty();
var first = heartbeats[0];
first.ServiceName.ShouldBe("CritterFleet");
first.NodeNumber.ShouldBe(4242);
first.Uptime.ShouldBeGreaterThanOrEqualTo(TimeSpan.Zero);
}

[Fact]
public async Task does_not_publish_when_disabled()
{
var (options, router) = BuildPublishingOptions("Disabled");
options.EnableHeartbeats(20.Milliseconds());
options.Heartbeat.Enabled = false;

var runtime = BuildRuntime(options, router);
var service = new HeartbeatBackgroundService(runtime);

using var cts = new CancellationTokenSource();
await service.StartAsync(cts.Token);

await Task.Delay(120);
await cts.CancelAsync();
await service.StopAsync(CancellationToken.None);

var publishCount = router.ReceivedCalls()
.Count(c => c.GetMethodInfo().Name == nameof(IMessageRouter.RouteForPublish));

publishCount.ShouldBe(0);
}

[Fact]
public void enable_heartbeats_extension_sets_policy_and_registers_hosted_service()
{
var options = new WolverineOptions();

options.EnableHeartbeats(7.Seconds());

options.Heartbeat.Enabled.ShouldBeTrue();
options.Heartbeat.Interval.ShouldBe(7.Seconds());

options.Services.ShouldContain(d =>
d.ServiceType == typeof(HeartbeatBackgroundService));
}

[Fact]
public void enable_heartbeats_without_interval_keeps_default()
{
var options = new WolverineOptions();
var defaultInterval = options.Heartbeat.Interval;

options.EnableHeartbeats();

options.Heartbeat.Enabled.ShouldBeTrue();
options.Heartbeat.Interval.ShouldBe(defaultInterval);
}
}
31 changes: 31 additions & 0 deletions src/Wolverine/HeartbeatPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
namespace Wolverine;

/// <summary>
/// Configures the periodic <see cref="Wolverine.Runtime.Heartbeat.WolverineHeartbeat"/>
/// emission performed by <see cref="Wolverine.Runtime.Heartbeat.HeartbeatBackgroundService"/>.
/// Heartbeats are intended to give external monitoring tools (e.g. CritterWatch) a simple
/// liveness signal: each running node publishes a tiny message at a regular cadence, and
/// missing heartbeats indicate that a node has gone dark.
/// </summary>
/// <remarks>
/// The hosted service that emits heartbeats is registered through
/// <see cref="WolverineOptionsExtensions.EnableHeartbeats"/>. Setting <see cref="Enabled"/> to
/// <c>false</c> after registration will cause the background service to exit immediately on
/// startup so heartbeats can be selectively disabled per environment without removing the
/// registration.
/// </remarks>
public class HeartbeatPolicy
{
/// <summary>
/// Whether heartbeat emission is enabled. Defaults to <c>true</c>. The hosted service
/// only runs when <see cref="WolverineOptionsExtensions.EnableHeartbeats"/> has been
/// invoked; this flag controls whether that registered service actually publishes.
/// </summary>
public bool Enabled { get; set; } = true;

/// <summary>
/// Interval between successive <see cref="Wolverine.Runtime.Heartbeat.WolverineHeartbeat"/>
/// publishes. Defaults to 30 seconds.
/// </summary>
public TimeSpan Interval { get; set; } = TimeSpan.FromSeconds(30);
}
78 changes: 78 additions & 0 deletions src/Wolverine/Runtime/Heartbeat/HeartbeatBackgroundService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Wolverine.Runtime.Heartbeat;

/// <summary>
/// Hosted service that publishes a <see cref="WolverineHeartbeat"/> on a regular cadence
/// dictated by <see cref="WolverineOptions.Heartbeat"/>. Registered through
/// <see cref="WolverineOptionsExtensions.EnableHeartbeats"/>.
/// </summary>
/// <remarks>
/// The service obtains its publish path from a freshly-constructed <see cref="MessageBus"/>
/// over the supplied <see cref="IWolverineRuntime"/>, so heartbeats traverse the normal
/// Wolverine routing pipeline. If <see cref="HeartbeatPolicy.Enabled"/> is <c>false</c>,
/// <see cref="ExecuteAsync"/> returns immediately without scheduling any work.
/// </remarks>
public class HeartbeatBackgroundService : BackgroundService
{
private readonly IWolverineRuntime _runtime;
private readonly DateTimeOffset _startedAt;

public HeartbeatBackgroundService(IWolverineRuntime runtime)
{
_runtime = runtime;
_startedAt = DateTimeOffset.UtcNow;
}

/// <summary>
/// For tests: time source override, defaults to <c>DateTimeOffset.UtcNow</c>.
/// </summary>
internal Func<DateTimeOffset> Now { get; set; } = () => DateTimeOffset.UtcNow;

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var policy = _runtime.Options.Heartbeat;
if (!policy.Enabled)
{
return;
}

var bus = new MessageBus(_runtime);

while (!stoppingToken.IsCancellationRequested)
{
try
{
await Task.Delay(policy.Interval, stoppingToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}

if (stoppingToken.IsCancellationRequested) return;

try
{
var now = Now();
var heartbeat = new WolverineHeartbeat(
_runtime.Options.ServiceName ?? string.Empty,
_runtime.Options.Durability.AssignedNodeNumber,
now,
now - _startedAt);

await bus.PublishAsync(heartbeat).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
return;
}
catch (Exception e)
{
// Heartbeat failure must never crash the host. Log and continue.
_runtime.Logger.LogWarning(e, "Failed to publish WolverineHeartbeat");
}
}
}
}
24 changes: 24 additions & 0 deletions src/Wolverine/Runtime/Heartbeat/WolverineHeartbeat.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace Wolverine.Runtime.Heartbeat;

/// <summary>
/// Periodic liveness signal emitted by <see cref="HeartbeatBackgroundService"/> when
/// heartbeats are enabled via <see cref="WolverineOptionsExtensions.EnableHeartbeats"/>.
/// External monitoring tools (e.g. CritterWatch) subscribe to <see cref="WolverineHeartbeat"/>
/// and infer node health from the cadence at which heartbeats arrive.
/// </summary>
/// <remarks>
/// Heartbeats are routed through Wolverine's normal publish pipeline, which means consumers
/// must register an explicit publish rule (for example
/// <c>opts.PublishMessage&lt;WolverineHeartbeat&gt;().ToRabbitExchange("monitoring")</c>) for
/// the heartbeats to leave the local node. With no publish rule and no local subscriber the
/// publish is effectively a no-op.
/// </remarks>
/// <param name="ServiceName">Logical service name from <c>WolverineOptions.ServiceName</c>.</param>
/// <param name="NodeNumber">Locally-assigned node number from <c>Durability.AssignedNodeNumber</c>.</param>
/// <param name="SentAt">UTC timestamp captured when the heartbeat was published.</param>
/// <param name="Uptime">Elapsed time since the heartbeat background service started.</param>
public record WolverineHeartbeat(
string ServiceName,
int NodeNumber,
DateTimeOffset SentAt,
TimeSpan Uptime);
9 changes: 9 additions & 0 deletions src/Wolverine/WolverineOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,15 @@ public void RegisterMessageType(Type messageType)
[ChildDescription]
public DurabilitySettings Durability { get; }

/// <summary>
/// Configuration for the periodic <see cref="Wolverine.Runtime.Heartbeat.WolverineHeartbeat"/>
/// emission used by external monitoring tools to detect node liveness. The hosted service
/// that emits heartbeats is registered through
/// <see cref="WolverineOptionsExtensions.EnableHeartbeats"/>.
/// </summary>
[ChildDescription]
public HeartbeatPolicy Heartbeat { get; } = new();

/// <summary>
/// The default message execution timeout for local queues. This uses a CancellationTokenSource
/// behind the scenes, and the timeout enforcement is dependent on the usage within handlers
Expand Down
Loading
Loading