Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class AsyncDaemonHealthCheckExtensionsTests: DaemonContext
private FakeHealthCheckBuilderStub _builder = new();
private readonly TimeProvider _timeProvider = Substitute.For<TimeProvider>();
private readonly DateTime _now = DateTime.UtcNow;
private readonly ProjectionStateTracker _stateTracker = new();

public AsyncDaemonHealthCheckExtensionsTests(ITestOutputHelper output) : base(output)
{
Expand Down Expand Up @@ -51,6 +52,17 @@ public void should_add_timeprovider_to_services()
_builder.Services.ShouldContain(x => x.ServiceType == typeof(TimeProvider));
}

[Fact]
public void should_add_projectionstatetracker_to_services()
{
_builder = new();
_builder.Services.ShouldNotContain(x => x.ServiceType == typeof(ProjectionStateTracker));

_builder.AddMartenAsyncDaemonHealthCheck(200, TimeSpan.FromSeconds(5));

_builder.Services.ShouldContain(x => x.ServiceType == typeof(ProjectionStateTracker));
}

[Fact]
public void should_add_healthcheck_to_services()
{
Expand All @@ -70,7 +82,7 @@ public async Task should_be_healthy_without_events()
{
x.Projections.Add(new FakeSingleStream1Projection(), ProjectionLifecycle.Async);
});
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(100), _timeProvider);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(100), _timeProvider, _stateTracker);

var result = await healthCheck.CheckHealthAsync(new());

Expand All @@ -89,7 +101,7 @@ public async Task should_be_healthy_with_one_projection_no_relevant_events()
session.Events.Append(Guid.NewGuid(), new FakeIrrellevantEvent());
await session.SaveChangesAsync();
await agent.Tracker.WaitForHighWaterMark(1);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(100), _timeProvider);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(100), _timeProvider, _stateTracker);

var result = await healthCheck.CheckHealthAsync(new());

Expand All @@ -112,7 +124,7 @@ public async Task should_be_unhealthy_with_no_projection_lag_allowed()
await session.SaveChangesAsync();
await agent.Tracker.WaitForHighWaterMark(eventCount);
await agent.Tracker.WaitForShardState(new ShardState("FakeStream2:All", eventCount), 15.Seconds());
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(0), _timeProvider);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(0), _timeProvider, _stateTracker);

var result = await healthCheck.CheckHealthAsync(new());

Expand Down Expand Up @@ -143,7 +155,7 @@ public async Task should_be_healthy_with_all_projections_caught_up()
await agent.Tracker.WaitForShardState(new ShardState("FakeStream3:All", eventCount), 15.Seconds());
await agent.Tracker.WaitForShardState(new ShardState("FakeStream4:All", eventCount), 15.Seconds());
await agent.Tracker.WaitForHighWaterMark(eventCount);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1), _timeProvider);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1), _timeProvider, _stateTracker);

var result = await healthCheck.CheckHealthAsync(new());

Expand Down Expand Up @@ -177,7 +189,7 @@ public async Task should_be_unhealthy_with_one_projection_lagging()

await theSession.ExecuteAsync(treeCommand);

var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1), _timeProvider);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1), _timeProvider, _stateTracker);

var result = await healthCheck.CheckHealthAsync(new());

Expand Down Expand Up @@ -211,7 +223,7 @@ public async Task should_be_healthy_with_one_projection_lagging_but_within_max_s

await theSession.ExecuteAsync(treeCommand);

var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1, TimeSpan.FromSeconds(30)), _timeProvider);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1, TimeSpan.FromSeconds(30)), _timeProvider, _stateTracker);

var result = await healthCheck.CheckHealthAsync(new());

Expand Down Expand Up @@ -247,7 +259,7 @@ public async Task should_be_unhealthy_with_one_projection_lagging_for_more_than_
await theSession.ExecuteAsync(treeCommand);

var maxSameLagTime = TimeSpan.FromSeconds(30);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1, maxSameLagTime), _timeProvider);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1, maxSameLagTime), _timeProvider, _stateTracker);
await healthCheck.CheckHealthAsync(new());

// When
Expand Down Expand Up @@ -287,7 +299,7 @@ public async Task should_be_healthy_with_one_projection_lagging_for_more_than_ma
await theSession.ExecuteAsync(treeCommandSeqId0);

var maxSameLagTime = TimeSpan.FromSeconds(30);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1, maxSameLagTime), _timeProvider);
var healthCheck = new AsyncDaemonHealthCheck(theStore, new(1, maxSameLagTime), _timeProvider, _stateTracker);
await healthCheck.CheckHealthAsync(new());

// When
Expand Down
25 changes: 18 additions & 7 deletions src/Marten.AspNetCore/Daemon/AsyncDaemonHealthCheckExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Threading.Tasks;
using JasperFx.Core;
using JasperFx.Events.Projections;
using Marten.Events.Projections;
using Marten.Storage;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
Expand Down Expand Up @@ -51,6 +50,7 @@ public static IHealthChecksBuilder AddMartenAsyncDaemonHealthCheck(
{
builder.Services.AddSingleton(new AsyncDaemonHealthCheckSettings(maxEventLag, maxSameLagTime));
builder.Services.TryAddSingleton(TimeProvider.System);
builder.Services.TryAddSingleton<ProjectionStateTracker>();
return builder.AddCheck<AsyncDaemonHealthCheck>(
nameof(AsyncDaemonHealthCheck),
tags: new[] { "Marten", "AsyncDaemon" }
Expand All @@ -64,14 +64,19 @@ public static IHealthChecksBuilder AddMartenAsyncDaemonHealthCheck(
/// <returns></returns>
internal record AsyncDaemonHealthCheckSettings(int MaxEventLag, TimeSpan? MaxSameLagTime = null);

/// <summary>
/// Tracks projection state across multiple health check invocations
/// </summary>
internal class ProjectionStateTracker
{
public ConcurrentDictionary<string, (DateTime CheckedAt, long Sequence)> LastProjectionsChecks { get; } = new();
}

/// <summary>
/// Health check implementation
/// </summary>
internal class AsyncDaemonHealthCheck: IHealthCheck
{
private readonly ConcurrentDictionary<string, (DateTime CheckedAt, long Sequence)>
_lastProjectionsChecks = new();

/// <summary>
/// The allowed event projection processing lag compared to the HighWaterMark.
/// </summary>
Expand All @@ -92,13 +97,19 @@ internal class AsyncDaemonHealthCheck: IHealthCheck

private readonly TimeProvider _timeProvider;

/// <summary>
/// Projection state tracker that persists between health check invocations
/// </summary>
private readonly ProjectionStateTracker _stateTracker;

public AsyncDaemonHealthCheck(IDocumentStore store, AsyncDaemonHealthCheckSettings settings,
TimeProvider timeProvider)
TimeProvider timeProvider, ProjectionStateTracker stateTracker)
{
_store = store;
_timeProvider = timeProvider;
_maxEventLag = settings.MaxEventLag;
_maxSameLagTime = settings.MaxSameLagTime;
_stateTracker = stateTracker;
}

public async Task<HealthCheckResult> CheckHealthAsync(
Expand Down Expand Up @@ -181,7 +192,7 @@ public async Task<HealthCheckResult> CheckProjectionHealthAsync(IMartenDatabase
x =>
{
var (laggingSince, lastKnownPosition) =
_lastProjectionsChecks.GetValueOrDefault(x.ShardName, (now, x.Sequence));
_stateTracker.LastProjectionsChecks.GetValueOrDefault(x.ShardName, (now, x.Sequence));

var isLaggingWithSamePositionForGivenTime =
now.Subtract(laggingSince) >= _maxSameLagTime &&
Expand All @@ -194,7 +205,7 @@ public async Task<HealthCheckResult> CheckProjectionHealthAsync(IMartenDatabase

foreach (var laggingProjection in laggingProjections)
{
_lastProjectionsChecks.AddOrUpdate(
_stateTracker.LastProjectionsChecks.AddOrUpdate(
laggingProjection.ShardName,
_ => (now, laggingProjection.Sequence),
(_, _) => (now, laggingProjection.Sequence)
Expand Down
Loading