Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Wolverine.Logging;
using Wolverine.Persistence;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using Wolverine.Runtime.WorkerQueues;
Expand All @@ -26,6 +29,7 @@ public partial class CosmosDbDurabilityAgent : IAgent
private readonly CancellationTokenSource _cancellation = new();
private readonly CancellationTokenSource _combined;
private PersistenceMetrics? _metrics;
private readonly DurabilityHealthSignals _health;

public CosmosDbDurabilityAgent(Container container, IWolverineRuntime runtime,
CosmosDbMessageStore parent)
Expand All @@ -41,6 +45,7 @@ public CosmosDbDurabilityAgent(Container container, IWolverineRuntime runtime,
_logger = runtime.LoggerFactory.CreateLogger<CosmosDbDurabilityAgent>();

_combined = CancellationTokenSource.CreateLinkedTokenSource(runtime.Cancellation, _cancellation.Token);
_health = new DurabilityHealthSignals(_settings);
}

public Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -69,16 +74,26 @@ internal void StartTimers()
{
var lastExpiredTime = DateTimeOffset.UtcNow;

await tryRecoverIncomingMessages();
await tryRecoverOutgoingMessagesAsync();

if (_settings.DeadLetterQueueExpirationEnabled)
try
{
var now = DateTimeOffset.UtcNow;
if (now > lastExpiredTime.AddHours(1))
await tryRecoverIncomingMessages();
await tryRecoverOutgoingMessagesAsync();

if (_settings.DeadLetterQueueExpirationEnabled)
{
await tryDeleteExpiredDeadLetters();
var now = DateTimeOffset.UtcNow;
if (now > lastExpiredTime.AddHours(1))
{
await tryDeleteExpiredDeadLetters();
}
}

_health.RecordPollSuccess();
}
catch (Exception e) when (!_combined.IsCancellationRequested)
{
_health.RecordPollFailure(e);
_logger.LogError(e, "Recovery loop tick failed");
}

await timer.WaitForNextTickAsync(_combined.Token);
Expand All @@ -92,12 +107,41 @@ internal void StartTimers()

while (!_combined.IsCancellationRequested)
{
await runScheduledJobs();
try
{
await runScheduledJobs();
_health.RecordPollSuccess();
}
catch (Exception e) when (!_combined.IsCancellationRequested)
{
_health.RecordPollFailure(e);
_logger.LogError(e, "Scheduled-job loop tick failed");
}

await timer.WaitForNextTickAsync(_combined.Token);
}
}, _combined.Token);
}

public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = default)
{
PersistedCounts? counts = null;
if (Status == AgentStatus.Running)
{
try
{
counts = await _parent.Admin.FetchCountsAsync();
}
catch (Exception e)
{
_health.RecordPollFailure(e);
}
}

return _health.Evaluate(Status, Uri, counts, DateTimeOffset.UtcNow);
}

private async Task tryDeleteExpiredDeadLetters()
{
var now = DateTimeOffset.UtcNow;
Expand Down
41 changes: 21 additions & 20 deletions src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Wolverine.Logging;
using Wolverine.Persistence;
using Wolverine.Persistence.Durability;
using Wolverine.RDBMS.Durability;
Expand All @@ -29,8 +30,7 @@ internal class DurabilityAgent : IAgent
private Timer? _recoveryTimer;
private Timer? _scheduledJobTimer;

private int _successCount;
private int _exceptionCount;
private readonly DurabilityHealthSignals _health;
private DateTime _lastHealthCheck = DateTime.UtcNow;

public DurabilityAgent(IWolverineRuntime runtime, IMessageDatabase database)
Expand All @@ -45,6 +45,8 @@ public DurabilityAgent(IWolverineRuntime runtime, IMessageDatabase database)

_logger = runtime.LoggerFactory.CreateLogger<DurabilityAgent>();

_health = new DurabilityHealthSignals(_settings);

_runningBlock = new Block<IAgentCommand>(async batch =>
{
if (runtime.Cancellation.IsCancellationRequested)
Expand All @@ -55,11 +57,11 @@ public DurabilityAgent(IWolverineRuntime runtime, IMessageDatabase database)
try
{
await executor.InvokeAsync(batch, new MessageBus(runtime));
Interlocked.Increment(ref _successCount);
_health.RecordPollSuccess();
}
catch (Exception e)
{
Interlocked.Increment(ref _exceptionCount);
_health.RecordPollFailure(e);
_logger.LogError(e, "Error trying to run durability agent commands");
}
});
Expand Down Expand Up @@ -232,28 +234,27 @@ public void StartScheduledJobPolling()
_settings, _settings.ScheduledJobFirstExecution, _settings.ScheduledJobPollingTime);
}

public Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = default)
{
if (Status != AgentStatus.Running)
{
return Task.FromResult(HealthCheckResult.Unhealthy($"Agent {Uri} is {Status}"));
}

var exceptions = Interlocked.Exchange(ref _exceptionCount, 0);
var successes = Interlocked.Exchange(ref _successCount, 0);
_lastHealthCheck = DateTime.UtcNow;

if (exceptions > 0 && successes == 0)
{
return Task.FromResult(HealthCheckResult.Unhealthy("All database operations failed"));
}

if (exceptions > 0)
// Skip the count fetch when the agent isn't running — the status check below will
// short-circuit anyway, and a stopped agent shouldn't spin up a fresh DB query just
// to be told the same thing.
PersistedCounts? counts = null;
if (Status == AgentStatus.Running)
{
return Task.FromResult(HealthCheckResult.Degraded("Some database operations failed"));
try
{
counts = await _database.FetchCountsAsync();
}
catch (Exception e)
{
_health.RecordPollFailure(e);
}
}

return Task.FromResult(HealthCheckResult.Healthy());
return _health.Evaluate(Status, Uri, counts, DateTimeOffset.UtcNow);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using JasperFx;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Raven.Client.Documents;
using Raven.Client.Documents.Operations;
using Wolverine.Logging;
using Wolverine.Persistence;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Runtime.Agents;
using Wolverine.Runtime.Handlers;
Expand All @@ -28,7 +31,8 @@ public partial class RavenDbDurabilityAgent : IAgent
private readonly CancellationTokenSource _cancellation = new();
private readonly CancellationTokenSource _combined;
private PersistenceMetrics _metrics = null!;

private readonly DurabilityHealthSignals _health;

public RavenDbDurabilityAgent(IDocumentStore store, IWolverineRuntime runtime, RavenDbMessageStore parent)
{
_store = store;
Expand All @@ -40,8 +44,9 @@ public RavenDbDurabilityAgent(IDocumentStore store, IWolverineRuntime runtime, R
Uri = new Uri($"{PersistenceConstants.AgentScheme}://ravendb/durability");

_logger = runtime.LoggerFactory.CreateLogger<RavenDbDurabilityAgent>();

_combined = CancellationTokenSource.CreateLinkedTokenSource(runtime.Cancellation, _cancellation.Token);
_health = new DurabilityHealthSignals(_settings);
}

public Task StartAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -70,18 +75,28 @@ internal void StartTimers()
while (!_combined.IsCancellationRequested)
{
var lastExpiredTime = DateTimeOffset.UtcNow;

await tryRecoverIncomingMessages();
await tryRecoverOutgoingMessagesAsync();

if (_settings.DeadLetterQueueExpirationEnabled)
try
{
// Crudely just doing this every hour
var now = DateTimeOffset.UtcNow;
if (now > lastExpiredTime.AddHours(1))
await tryRecoverIncomingMessages();
await tryRecoverOutgoingMessagesAsync();

if (_settings.DeadLetterQueueExpirationEnabled)
{
await tryDeleteExpiredDeadLetters();
// Crudely just doing this every hour
var now = DateTimeOffset.UtcNow;
if (now > lastExpiredTime.AddHours(1))
{
await tryDeleteExpiredDeadLetters();
}
}

_health.RecordPollSuccess();
}
catch (Exception e) when (!_combined.IsCancellationRequested)
{
_health.RecordPollFailure(e);
_logger.LogError(e, "Recovery loop tick failed");
}

await timer.WaitForNextTickAsync(_combined.Token);
Expand All @@ -92,16 +107,45 @@ internal void StartTimers()
{
await Task.Delay(recoveryStart, _combined.Token);
using var timer = new PeriodicTimer(_settings.ScheduledJobPollingTime);

while (!_combined.IsCancellationRequested)
{
await runScheduledJobs();
try
{
await runScheduledJobs();
_health.RecordPollSuccess();
}
catch (Exception e) when (!_combined.IsCancellationRequested)
{
_health.RecordPollFailure(e);
_logger.LogError(e, "Scheduled-job loop tick failed");
}

await timer.WaitForNextTickAsync(_combined.Token);
}
}, _combined.Token);

}

public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
CancellationToken cancellationToken = default)
{
PersistedCounts? counts = null;
if (Status == AgentStatus.Running)
{
try
{
counts = await _parent.Admin.FetchCountsAsync();
}
catch (Exception e)
{
_health.RecordPollFailure(e);
}
}

return _health.Evaluate(Status, Uri, counts, DateTimeOffset.UtcNow);
}

private async Task tryDeleteExpiredDeadLetters()
{
var now = DateTimeOffset.UtcNow;
Expand Down
Loading
Loading