diff --git a/src/Persistence/PostgresqlTests/PostgresqlMessageStoreTests.cs b/src/Persistence/PostgresqlTests/PostgresqlMessageStoreTests.cs index c2c43dc92..5a992f104 100644 --- a/src/Persistence/PostgresqlTests/PostgresqlMessageStoreTests.cs +++ b/src/Persistence/PostgresqlTests/PostgresqlMessageStoreTests.cs @@ -1,7 +1,9 @@ using IntegrationTests; using JasperFx.Core; +using JasperFx.Core.Reflection; using Marten; using Microsoft.Extensions.Hosting; +using Npgsql; using Shouldly; using Weasel.Core; using Wolverine; @@ -10,6 +12,7 @@ using Wolverine.RDBMS; using Wolverine.RDBMS.Durability; using Wolverine.RDBMS.Polling; +using Wolverine.Runtime.Agents; using Wolverine.Transports.Tcp; namespace PostgresqlTests; @@ -61,6 +64,57 @@ public async Task delete_expired_envelopes() // Now, let's set the keep until in the past } + [Fact] + public async Task delete_old_log_node_records() + { + var nodeRecord1 = new NodeRecord() + { + AgentUri = new Uri("fake://one"), + Description = "Started", + Id = Guid.NewGuid().ToString(), + NodeNumber = 1, + RecordType = NodeRecordType.AgentStarted, ServiceName = "MyService", + Timestamp = DateTimeOffset.UtcNow.Subtract(10.Days()) + }; + + var nodeRecord2 = new NodeRecord() + { + AgentUri = new Uri("fake://one"), + Description = "Started", + Id = Guid.NewGuid().ToString(), + NodeNumber = 2, + RecordType = NodeRecordType.AgentStarted, ServiceName = "MyService", + Timestamp = DateTimeOffset.UtcNow.Subtract(4.Days()) + }; + + var messageDatabase = thePersistence.As(); + var log = new PersistNodeRecord(messageDatabase.Settings, [nodeRecord1, nodeRecord2]); + await theHost.InvokeAsync(new DatabaseOperationBatch(messageDatabase, [log])); + + using var conn = new NpgsqlConnection(Servers.PostgresConnectionString); + await conn.OpenAsync(); + await conn.CreateCommand( + $"update receiver.{DatabaseConstants.NodeRecordTableName} set timestamp = :time where node_number = 2") + .With("time", DateTimeOffset.UtcNow.Subtract(10.Days())) + .ExecuteNonQueryAsync(); + await conn.CloseAsync(); + + var recent2 = await thePersistence.Nodes.FetchRecentRecordsAsync(100); + + recent2.Any().ShouldBeTrue(); + + var op = new DeleteOldNodeEventRecords((IMessageDatabase)thePersistence, + new DurabilitySettings { NodeEventRecordExpirationTime = 5.Days() }); + + var batch = new DatabaseOperationBatch((IMessageDatabase)thePersistence, [op]); + await theHost.InvokeAsync(batch); + + var recent = await thePersistence.Nodes.FetchRecentRecordsAsync(100); + + recent.Any().ShouldBeTrue(); + recent.Any(x => x.Id == nodeRecord2.Id).ShouldBeFalse(); + } + [Fact] public async Task move_replayable_error_messages_to_incoming() { diff --git a/src/Persistence/SqlServerTests/Persistence/SqlServerMessageStoreTests.cs b/src/Persistence/SqlServerTests/Persistence/SqlServerMessageStoreTests.cs index 83c71d114..733001692 100644 --- a/src/Persistence/SqlServerTests/Persistence/SqlServerMessageStoreTests.cs +++ b/src/Persistence/SqlServerTests/Persistence/SqlServerMessageStoreTests.cs @@ -1,6 +1,7 @@ using IntegrationTests; using JasperFx.Core; using JasperFx.Core.Reflection; +using Microsoft.Data.SqlClient; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging.Abstractions; @@ -12,6 +13,7 @@ using Wolverine.RDBMS; using Wolverine.RDBMS.Durability; using Wolverine.RDBMS.Polling; +using Wolverine.Runtime.Agents; using Wolverine.Runtime.WorkerQueues; using Wolverine.SqlServer; using Wolverine.Tracking; @@ -148,5 +150,57 @@ await thePersistence.As().PollForScheduledMessagesAsync(theRec stored.OwnerId.ShouldBe(durabilitySettings.AssignedNodeNumber); stored.Status.ShouldBe(EnvelopeStatus.Incoming); } + + [Fact] + public async Task delete_old_log_node_records() + { + var nodeRecord1 = new NodeRecord() + { + AgentUri = new Uri("fake://one"), + Description = "Started", + Id = Guid.NewGuid().ToString(), + NodeNumber = 1, + RecordType = NodeRecordType.AgentStarted, ServiceName = "MyService", + Timestamp = DateTimeOffset.UtcNow.Subtract(10.Days()) + }; + + var nodeRecord2 = new NodeRecord() + { + AgentUri = new Uri("fake://one"), + Description = "Started", + Id = Guid.NewGuid().ToString(), + NodeNumber = 2, + RecordType = NodeRecordType.AgentStarted, ServiceName = "MyService", + Timestamp = DateTimeOffset.UtcNow.Subtract(4.Days()) + }; + + var messageDatabase = thePersistence.As(); + var log = new PersistNodeRecord(messageDatabase.Settings, [nodeRecord1, nodeRecord2]); + await theHost.InvokeAsync(new DatabaseOperationBatch(messageDatabase, [log])); + + using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + await conn.CreateCommand( + $"update receiver.{DatabaseConstants.NodeRecordTableName} set timestamp = @time where node_number = 2") + .With("time", DateTimeOffset.UtcNow.Subtract(10.Days())) + .ExecuteNonQueryAsync(); + await conn.CloseAsync(); + + var recent2 = await thePersistence.Nodes.FetchRecentRecordsAsync(100); + + recent2.Any().ShouldBeTrue(); + + var op = new DeleteOldNodeEventRecords((IMessageDatabase)thePersistence, + new DurabilitySettings { NodeEventRecordExpirationTime = 5.Days() }); + + var batch = new DatabaseOperationBatch((IMessageDatabase)thePersistence, [op]); + await theHost.InvokeAsync(batch); + + var recent = await thePersistence.Nodes.FetchRecentRecordsAsync(100); + + recent.Any().ShouldBeTrue(); + recent.Any(x => x.Id == nodeRecord2.Id).ShouldBeFalse(); + } + } \ No newline at end of file diff --git a/src/Persistence/Wolverine.RDBMS/Durability/DeleteOldNodeEventRecords.cs b/src/Persistence/Wolverine.RDBMS/Durability/DeleteOldNodeEventRecords.cs new file mode 100644 index 000000000..aec350ca8 --- /dev/null +++ b/src/Persistence/Wolverine.RDBMS/Durability/DeleteOldNodeEventRecords.cs @@ -0,0 +1,43 @@ +using System.Data.Common; +using Wolverine.RDBMS.Polling; +using Wolverine.Runtime.Agents; +using DbCommandBuilder = Weasel.Core.DbCommandBuilder; + +namespace Wolverine.RDBMS.Durability; + +internal class DeleteOldNodeEventRecords : IDatabaseOperation, IDoNotReturnData +{ + private readonly IMessageDatabase _database; + private readonly DurabilitySettings _settings; + + public DeleteOldNodeEventRecords(IMessageDatabase database, DurabilitySettings settings) + { + _database = database ?? throw new ArgumentNullException(nameof(database)); + if (!_database.Settings.IsMain) + { + throw new ArgumentOutOfRangeException(nameof(database), "This operation is only valid on 'Main' databases"); + } + + _settings = settings; + } + + public string Description { get; } = "Deleting expired node event records"; + public void ConfigureCommand(DbCommandBuilder builder) + { + var cutoffTime = DateTimeOffset.UtcNow.Subtract(_settings.NodeEventRecordExpirationTime); + + builder.Append($"delete from {_database.SchemaName}.{DatabaseConstants.NodeRecordTableName} where timestamp < "); + builder.AppendParameter(cutoffTime); + builder.Append(';'); + } + + public Task ReadResultsAsync(DbDataReader reader, IList exceptions, CancellationToken token) + { + return Task.CompletedTask; + } + + public IEnumerable PostProcessingCommands() + { + yield break; + } +} \ No newline at end of file diff --git a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs index 62c352909..6efb8a342 100644 --- a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs +++ b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs @@ -91,14 +91,7 @@ public Task StartAsync(CancellationToken cancellationToken) _recoveryTimer = new Timer(_ => { - var operations = new IDatabaseOperation[] - { - new CheckRecoverableIncomingMessagesOperation(_database, _runtime.Endpoints, _settings, _logger), - new CheckRecoverableOutgoingMessagesOperation(_database, _runtime, _logger), - new DeleteExpiredEnvelopesOperation( - new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), DateTimeOffset.UtcNow), - new MoveReplayableErrorMessagesToIncomingOperation(_database) - }; + var operations = buildOperationBatch(); var batch = new DatabaseOperationBatch(_database, operations); _runningBlock.Post(batch); @@ -126,6 +119,42 @@ public Task StartAsync(CancellationToken cancellationToken) return Task.CompletedTask; } + private DateTimeOffset? _lastNodeRecordPruneTime; + + private bool isTimeToPruneNodeEventRecords() + { + if (_lastNodeRecordPruneTime == null) return true; + + if (DateTimeOffset.UtcNow.Subtract(_lastNodeRecordPruneTime.Value) > 1.Hours()) return true; + + return false; + } + + private IDatabaseOperation[] buildOperationBatch() + { + if (_database.Settings.IsMain && isTimeToPruneNodeEventRecords()) + { + return + [ + new CheckRecoverableIncomingMessagesOperation(_database, _runtime.Endpoints, _settings, _logger), + new CheckRecoverableOutgoingMessagesOperation(_database, _runtime, _logger), + new DeleteExpiredEnvelopesOperation( + new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), DateTimeOffset.UtcNow), + new MoveReplayableErrorMessagesToIncomingOperation(_database), + new DeleteOldNodeEventRecords(_database, _settings) + ]; + } + + return + [ + new CheckRecoverableIncomingMessagesOperation(_database, _runtime.Endpoints, _settings, _logger), + new CheckRecoverableOutgoingMessagesOperation(_database, _runtime, _logger), + new DeleteExpiredEnvelopesOperation( + new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), DateTimeOffset.UtcNow), + new MoveReplayableErrorMessagesToIncomingOperation(_database) + ]; + } + public void StartScheduledJobPolling() { _scheduledJobTimer = diff --git a/src/Wolverine/DurabilitySettings.cs b/src/Wolverine/DurabilitySettings.cs index 64b8d4c9c..14181db7e 100644 --- a/src/Wolverine/DurabilitySettings.cs +++ b/src/Wolverine/DurabilitySettings.cs @@ -195,6 +195,12 @@ internal set /// public bool DeadLetterQueueExpirationEnabled { get; set; } + /// + /// Configurable time to keep records in the wolverine_node_records storage (or equivalent) for node records. + /// Default is 5 days + /// + public TimeSpan NodeEventRecordExpirationTime { get; set; } = 5.Days(); + /// /// Get or set the logical Wolverine service name. By default, this is /// derived from the name of a custom WolverineOptions