Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions src/Persistence/PostgresqlTests/PostgresqlMessageStoreTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Marten;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Shouldly;
using Weasel.Core;
using Wolverine;
Expand All @@ -10,6 +12,7 @@
using Wolverine.RDBMS;
using Wolverine.RDBMS.Durability;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime.Agents;
using Wolverine.Transports.Tcp;

namespace PostgresqlTests;
Expand Down Expand Up @@ -61,6 +64,57 @@ public async Task delete_expired_envelopes()
// Now, let's set the keep until in the past
}

[Fact]
public async Task delete_old_log_node_records()
{
var nodeRecord1 = new NodeRecord()
{
AgentUri = new Uri("fake://one"),
Description = "Started",
Id = Guid.NewGuid().ToString(),
NodeNumber = 1,
RecordType = NodeRecordType.AgentStarted, ServiceName = "MyService",
Timestamp = DateTimeOffset.UtcNow.Subtract(10.Days())
};

var nodeRecord2 = new NodeRecord()
{
AgentUri = new Uri("fake://one"),
Description = "Started",
Id = Guid.NewGuid().ToString(),
NodeNumber = 2,
RecordType = NodeRecordType.AgentStarted, ServiceName = "MyService",
Timestamp = DateTimeOffset.UtcNow.Subtract(4.Days())
};

var messageDatabase = thePersistence.As<IMessageDatabase>();
var log = new PersistNodeRecord(messageDatabase.Settings, [nodeRecord1, nodeRecord2]);
await theHost.InvokeAsync(new DatabaseOperationBatch(messageDatabase, [log]));

using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();
await conn.CreateCommand(
$"update receiver.{DatabaseConstants.NodeRecordTableName} set timestamp = :time where node_number = 2")
.With("time", DateTimeOffset.UtcNow.Subtract(10.Days()))
.ExecuteNonQueryAsync();
await conn.CloseAsync();

var recent2 = await thePersistence.Nodes.FetchRecentRecordsAsync(100);

recent2.Any().ShouldBeTrue();

var op = new DeleteOldNodeEventRecords((IMessageDatabase)thePersistence,
new DurabilitySettings { NodeEventRecordExpirationTime = 5.Days() });

var batch = new DatabaseOperationBatch((IMessageDatabase)thePersistence, [op]);
await theHost.InvokeAsync(batch);

var recent = await thePersistence.Nodes.FetchRecentRecordsAsync(100);

recent.Any().ShouldBeTrue();
recent.Any(x => x.Id == nodeRecord2.Id).ShouldBeFalse();
}

[Fact]
public async Task move_replayable_error_messages_to_incoming()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using IntegrationTests;
using JasperFx.Core;
using JasperFx.Core.Reflection;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging.Abstractions;
Expand All @@ -12,6 +13,7 @@
using Wolverine.RDBMS;
using Wolverine.RDBMS.Durability;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime.Agents;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.SqlServer;
using Wolverine.Tracking;
Expand Down Expand Up @@ -148,5 +150,57 @@ await thePersistence.As<IMessageDatabase>().PollForScheduledMessagesAsync(theRec
stored.OwnerId.ShouldBe(durabilitySettings.AssignedNodeNumber);
stored.Status.ShouldBe(EnvelopeStatus.Incoming);
}

[Fact]
public async Task delete_old_log_node_records()
{
var nodeRecord1 = new NodeRecord()
{
AgentUri = new Uri("fake://one"),
Description = "Started",
Id = Guid.NewGuid().ToString(),
NodeNumber = 1,
RecordType = NodeRecordType.AgentStarted, ServiceName = "MyService",
Timestamp = DateTimeOffset.UtcNow.Subtract(10.Days())
};

var nodeRecord2 = new NodeRecord()
{
AgentUri = new Uri("fake://one"),
Description = "Started",
Id = Guid.NewGuid().ToString(),
NodeNumber = 2,
RecordType = NodeRecordType.AgentStarted, ServiceName = "MyService",
Timestamp = DateTimeOffset.UtcNow.Subtract(4.Days())
};

var messageDatabase = thePersistence.As<IMessageDatabase>();
var log = new PersistNodeRecord(messageDatabase.Settings, [nodeRecord1, nodeRecord2]);
await theHost.InvokeAsync(new DatabaseOperationBatch(messageDatabase, [log]));

using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();
await conn.CreateCommand(
$"update receiver.{DatabaseConstants.NodeRecordTableName} set timestamp = @time where node_number = 2")
.With("time", DateTimeOffset.UtcNow.Subtract(10.Days()))
.ExecuteNonQueryAsync();
await conn.CloseAsync();

var recent2 = await thePersistence.Nodes.FetchRecentRecordsAsync(100);

recent2.Any().ShouldBeTrue();

var op = new DeleteOldNodeEventRecords((IMessageDatabase)thePersistence,
new DurabilitySettings { NodeEventRecordExpirationTime = 5.Days() });

var batch = new DatabaseOperationBatch((IMessageDatabase)thePersistence, [op]);
await theHost.InvokeAsync(batch);

var recent = await thePersistence.Nodes.FetchRecentRecordsAsync(100);

recent.Any().ShouldBeTrue();
recent.Any(x => x.Id == nodeRecord2.Id).ShouldBeFalse();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
using System.Data.Common;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime.Agents;
using DbCommandBuilder = Weasel.Core.DbCommandBuilder;

namespace Wolverine.RDBMS.Durability;

internal class DeleteOldNodeEventRecords : IDatabaseOperation, IDoNotReturnData
{
private readonly IMessageDatabase _database;
private readonly DurabilitySettings _settings;

public DeleteOldNodeEventRecords(IMessageDatabase database, DurabilitySettings settings)
{
_database = database ?? throw new ArgumentNullException(nameof(database));
if (!_database.Settings.IsMain)
{
throw new ArgumentOutOfRangeException(nameof(database), "This operation is only valid on 'Main' databases");
}

_settings = settings;
}

public string Description { get; } = "Deleting expired node event records";
public void ConfigureCommand(DbCommandBuilder builder)
{
var cutoffTime = DateTimeOffset.UtcNow.Subtract(_settings.NodeEventRecordExpirationTime);

builder.Append($"delete from {_database.SchemaName}.{DatabaseConstants.NodeRecordTableName} where timestamp < ");
builder.AppendParameter(cutoffTime);
builder.Append(';');
}

public Task ReadResultsAsync(DbDataReader reader, IList<Exception> exceptions, CancellationToken token)
{
return Task.CompletedTask;
}

public IEnumerable<IAgentCommand> PostProcessingCommands()
{
yield break;
}
}
45 changes: 37 additions & 8 deletions src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,7 @@ public Task StartAsync(CancellationToken cancellationToken)

_recoveryTimer = new Timer(_ =>
{
var operations = new IDatabaseOperation[]
{
new CheckRecoverableIncomingMessagesOperation(_database, _runtime.Endpoints, _settings, _logger),
new CheckRecoverableOutgoingMessagesOperation(_database, _runtime, _logger),
new DeleteExpiredEnvelopesOperation(
new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), DateTimeOffset.UtcNow),
new MoveReplayableErrorMessagesToIncomingOperation(_database)
};
var operations = buildOperationBatch();

var batch = new DatabaseOperationBatch(_database, operations);
_runningBlock.Post(batch);
Expand Down Expand Up @@ -126,6 +119,42 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

private DateTimeOffset? _lastNodeRecordPruneTime;

private bool isTimeToPruneNodeEventRecords()
{
if (_lastNodeRecordPruneTime == null) return true;

if (DateTimeOffset.UtcNow.Subtract(_lastNodeRecordPruneTime.Value) > 1.Hours()) return true;

return false;
}

private IDatabaseOperation[] buildOperationBatch()
{
if (_database.Settings.IsMain && isTimeToPruneNodeEventRecords())
{
return
[
new CheckRecoverableIncomingMessagesOperation(_database, _runtime.Endpoints, _settings, _logger),
new CheckRecoverableOutgoingMessagesOperation(_database, _runtime, _logger),
new DeleteExpiredEnvelopesOperation(
new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), DateTimeOffset.UtcNow),
new MoveReplayableErrorMessagesToIncomingOperation(_database),
new DeleteOldNodeEventRecords(_database, _settings)
];
}

return
[
new CheckRecoverableIncomingMessagesOperation(_database, _runtime.Endpoints, _settings, _logger),
new CheckRecoverableOutgoingMessagesOperation(_database, _runtime, _logger),
new DeleteExpiredEnvelopesOperation(
new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), DateTimeOffset.UtcNow),
new MoveReplayableErrorMessagesToIncomingOperation(_database)
];
}

public void StartScheduledJobPolling()
{
_scheduledJobTimer =
Expand Down
6 changes: 6 additions & 0 deletions src/Wolverine/DurabilitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ internal set
/// </summary>
public bool DeadLetterQueueExpirationEnabled { get; set; }

/// <summary>
/// Configurable time to keep records in the wolverine_node_records storage (or equivalent) for node records.
/// Default is 5 days
/// </summary>
public TimeSpan NodeEventRecordExpirationTime { get; set; } = 5.Days();

/// <summary>
/// Get or set the logical Wolverine service name. By default, this is
/// derived from the name of a custom WolverineOptions
Expand Down
Loading