Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Data.Common;
using JasperFx.Core;
using Weasel.Core;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime.Agents;
using DbCommandBuilder = Weasel.Core.DbCommandBuilder;

namespace Wolverine.RDBMS.Durability;

/// <summary>
/// Releases inbox/outbox messages owned by nodes that no longer exist.
/// Used for ancillary/tenant databases that lack the wolverine_nodes table,
/// so the active node list must be provided externally.
/// </summary>
internal class ReleaseOrphanedMessagesForAncillaryOperation : IDatabaseOperation, IDoNotReturnData
{
private readonly IMessageDatabase _database;
private readonly IReadOnlyList<int> _activeNodeNumbers;

public ReleaseOrphanedMessagesForAncillaryOperation(IMessageDatabase database, IReadOnlyList<int> activeNodeNumbers)
{
_database = database;
_activeNodeNumbers = activeNodeNumbers;
}

public string Description => "Release inbox/outbox messages owned by nodes that no longer exist (ancillary database)";

public void ConfigureCommand(DbCommandBuilder builder)
{
if (_activeNodeNumbers.Count == 0) return;

var schemaName = _database.SchemaName;
var incomingTable = new DbObjectName(schemaName, DatabaseConstants.IncomingTable);
var outgoingTable = new DbObjectName(schemaName, DatabaseConstants.OutgoingTable);
var nodeList = string.Join(", ", _activeNodeNumbers);

builder.Append(
$"update {incomingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.OwnerId} not in ({nodeList});");
builder.Append(
$"update {outgoingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.OwnerId} not in ({nodeList});");
}

public Task ReadResultsAsync(DbDataReader reader, IList<Exception> exceptions, CancellationToken token)
{
return Task.CompletedTask;
}

public IEnumerable<IAgentCommand> PostProcessingCommands()
{
yield break;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System.Data.Common;
using Weasel.Core;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime.Agents;
using DbCommandBuilder = Weasel.Core.DbCommandBuilder;

namespace Wolverine.RDBMS.Durability;

/// <summary>
/// Releases inbox/outbox messages owned by nodes that no longer exist in the wolverine_nodes table.
/// Used for main databases where the nodes table is co-located.
/// </summary>
internal class ReleaseOrphanedMessagesOperation : IDatabaseOperation, IDoNotReturnData
{
private readonly IMessageDatabase _database;

public ReleaseOrphanedMessagesOperation(IMessageDatabase database)
{
_database = database;
}

public string Description => "Release inbox/outbox messages owned by nodes that no longer exist";

public void ConfigureCommand(DbCommandBuilder builder)
{
var schemaName = _database.SchemaName;
var incomingTable = new DbObjectName(schemaName, DatabaseConstants.IncomingTable);
var outgoingTable = new DbObjectName(schemaName, DatabaseConstants.OutgoingTable);
var nodesTable = new DbObjectName(schemaName, DatabaseConstants.NodeTableName);

builder.Append(
$"update {incomingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.OwnerId} not in (select {DatabaseConstants.NodeNumber} from {nodesTable});");
builder.Append(
$"update {outgoingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.OwnerId} not in (select {DatabaseConstants.NodeNumber} from {nodesTable});");
}

public Task ReadResultsAsync(DbDataReader reader, IList<Exception> exceptions, CancellationToken token)
{
return Task.CompletedTask;
}

public IEnumerable<IAgentCommand> PostProcessingCommands()
{
yield break;
}
}
33 changes: 28 additions & 5 deletions src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,23 @@ public Task StartAsync(CancellationToken cancellationToken)

var recoveryStart = _settings.ScheduledJobFirstExecution.Add(new Random().Next(0, 1000).Milliseconds());

_recoveryTimer = new Timer(_ =>
_recoveryTimer = new Timer(async _ =>
{
var operations = buildOperationBatch();
IReadOnlyList<int>? activeNodeNumbers = null;
if (_database.Settings.Role != MessageStoreRole.Main)
{
try
{
var nodes = await _runtime.Storage.Nodes.LoadAllNodesAsync(_runtime.Cancellation);
activeNodeNumbers = nodes.Select(n => n.AssignedNodeNumber).ToList();
}
catch (Exception e)
{
_logger.LogDebug(e, "Failed to load active nodes for orphaned message detection");
}
}

var operations = buildOperationBatch(activeNodeNumbers);

var batch = new DatabaseOperationBatch(_database, operations);
_runningBlock.Post(batch);
Expand Down Expand Up @@ -156,7 +170,7 @@ private bool isTimeToPruneNodeEventRecords()
return false;
}

private IDatabaseOperation[] buildOperationBatch()
private IDatabaseOperation[] buildOperationBatch(IReadOnlyList<int>? activeNodeNumbers = null)
{
var incomingTable = new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable);
var now = DateTimeOffset.UtcNow;
Expand All @@ -169,9 +183,18 @@ private IDatabaseOperation[] buildOperationBatch()
new MoveReplayableErrorMessagesToIncomingOperation(_database)
];

if (_database.Settings.Role == MessageStoreRole.Main && isTimeToPruneNodeEventRecords())
if (_database.Settings.Role == MessageStoreRole.Main)
{
ops.Add(new ReleaseOrphanedMessagesOperation(_database));

if (isTimeToPruneNodeEventRecords())
{
ops.Add(new DeleteOldNodeEventRecords(_database, _settings));
}
}
else if (activeNodeNumbers is { Count: > 0 })
{
ops.Add(new DeleteOldNodeEventRecords(_database, _settings));
ops.Add(new ReleaseOrphanedMessagesForAncillaryOperation(_database, activeNodeNumbers));
}

if (_runtime.Options.Durability.OutboxStaleTime.HasValue)
Expand Down
Loading