diff --git a/src/Persistence/Wolverine.RDBMS/Durability/ReleaseOrphanedMessagesForAncillaryOperation.cs b/src/Persistence/Wolverine.RDBMS/Durability/ReleaseOrphanedMessagesForAncillaryOperation.cs new file mode 100644 index 000000000..0fab09f6a --- /dev/null +++ b/src/Persistence/Wolverine.RDBMS/Durability/ReleaseOrphanedMessagesForAncillaryOperation.cs @@ -0,0 +1,52 @@ +using System.Data.Common; +using JasperFx.Core; +using Weasel.Core; +using Wolverine.RDBMS.Polling; +using Wolverine.Runtime.Agents; +using DbCommandBuilder = Weasel.Core.DbCommandBuilder; + +namespace Wolverine.RDBMS.Durability; + +/// +/// Releases inbox/outbox messages owned by nodes that no longer exist. +/// Used for ancillary/tenant databases that lack the wolverine_nodes table, +/// so the active node list must be provided externally. +/// +internal class ReleaseOrphanedMessagesForAncillaryOperation : IDatabaseOperation, IDoNotReturnData +{ + private readonly IMessageDatabase _database; + private readonly IReadOnlyList _activeNodeNumbers; + + public ReleaseOrphanedMessagesForAncillaryOperation(IMessageDatabase database, IReadOnlyList activeNodeNumbers) + { + _database = database; + _activeNodeNumbers = activeNodeNumbers; + } + + public string Description => "Release inbox/outbox messages owned by nodes that no longer exist (ancillary database)"; + + public void ConfigureCommand(DbCommandBuilder builder) + { + if (_activeNodeNumbers.Count == 0) return; + + var schemaName = _database.SchemaName; + var incomingTable = new DbObjectName(schemaName, DatabaseConstants.IncomingTable); + var outgoingTable = new DbObjectName(schemaName, DatabaseConstants.OutgoingTable); + var nodeList = string.Join(", ", _activeNodeNumbers); + + builder.Append( + $"update {incomingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.OwnerId} not in ({nodeList});"); + builder.Append( + $"update {outgoingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.OwnerId} not in ({nodeList});"); + } + + public Task ReadResultsAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public IEnumerable PostProcessingCommands() + { + yield break; + } +} diff --git a/src/Persistence/Wolverine.RDBMS/Durability/ReleaseOrphanedMessagesOperation.cs b/src/Persistence/Wolverine.RDBMS/Durability/ReleaseOrphanedMessagesOperation.cs new file mode 100644 index 000000000..1d2639b9d --- /dev/null +++ b/src/Persistence/Wolverine.RDBMS/Durability/ReleaseOrphanedMessagesOperation.cs @@ -0,0 +1,46 @@ +using System.Data.Common; +using Weasel.Core; +using Wolverine.RDBMS.Polling; +using Wolverine.Runtime.Agents; +using DbCommandBuilder = Weasel.Core.DbCommandBuilder; + +namespace Wolverine.RDBMS.Durability; + +/// +/// Releases inbox/outbox messages owned by nodes that no longer exist in the wolverine_nodes table. +/// Used for main databases where the nodes table is co-located. +/// +internal class ReleaseOrphanedMessagesOperation : IDatabaseOperation, IDoNotReturnData +{ + private readonly IMessageDatabase _database; + + public ReleaseOrphanedMessagesOperation(IMessageDatabase database) + { + _database = database; + } + + public string Description => "Release inbox/outbox messages owned by nodes that no longer exist"; + + public void ConfigureCommand(DbCommandBuilder builder) + { + var schemaName = _database.SchemaName; + var incomingTable = new DbObjectName(schemaName, DatabaseConstants.IncomingTable); + var outgoingTable = new DbObjectName(schemaName, DatabaseConstants.OutgoingTable); + var nodesTable = new DbObjectName(schemaName, DatabaseConstants.NodeTableName); + + builder.Append( + $"update {incomingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.OwnerId} not in (select {DatabaseConstants.NodeNumber} from {nodesTable});"); + builder.Append( + $"update {outgoingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.OwnerId} not in (select {DatabaseConstants.NodeNumber} from {nodesTable});"); + } + + public Task ReadResultsAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public IEnumerable PostProcessingCommands() + { + yield break; + } +} diff --git a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs index f87cf9cb2..e1379af18 100644 --- a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs +++ b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs @@ -76,9 +76,23 @@ public Task StartAsync(CancellationToken cancellationToken) var recoveryStart = _settings.ScheduledJobFirstExecution.Add(new Random().Next(0, 1000).Milliseconds()); - _recoveryTimer = new Timer(_ => + _recoveryTimer = new Timer(async _ => { - var operations = buildOperationBatch(); + IReadOnlyList? activeNodeNumbers = null; + if (_database.Settings.Role != MessageStoreRole.Main) + { + try + { + var nodes = await _runtime.Storage.Nodes.LoadAllNodesAsync(_runtime.Cancellation); + activeNodeNumbers = nodes.Select(n => n.AssignedNodeNumber).ToList(); + } + catch (Exception e) + { + _logger.LogDebug(e, "Failed to load active nodes for orphaned message detection"); + } + } + + var operations = buildOperationBatch(activeNodeNumbers); var batch = new DatabaseOperationBatch(_database, operations); _runningBlock.Post(batch); @@ -156,7 +170,7 @@ private bool isTimeToPruneNodeEventRecords() return false; } - private IDatabaseOperation[] buildOperationBatch() + private IDatabaseOperation[] buildOperationBatch(IReadOnlyList? activeNodeNumbers = null) { var incomingTable = new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable); var now = DateTimeOffset.UtcNow; @@ -169,9 +183,18 @@ private IDatabaseOperation[] buildOperationBatch() new MoveReplayableErrorMessagesToIncomingOperation(_database) ]; - if (_database.Settings.Role == MessageStoreRole.Main && isTimeToPruneNodeEventRecords()) + if (_database.Settings.Role == MessageStoreRole.Main) + { + ops.Add(new ReleaseOrphanedMessagesOperation(_database)); + + if (isTimeToPruneNodeEventRecords()) + { + ops.Add(new DeleteOldNodeEventRecords(_database, _settings)); + } + } + else if (activeNodeNumbers is { Count: > 0 }) { - ops.Add(new DeleteOldNodeEventRecords(_database, _settings)); + ops.Add(new ReleaseOrphanedMessagesForAncillaryOperation(_database, activeNodeNumbers)); } if (_runtime.Options.Durability.OutboxStaleTime.HasValue)