Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<NoWarn>1570;1571;1572;1573;1574;1587;1591;1701;1702;1711;1735;0618</NoWarn>
<ImplicitUsings>true</ImplicitUsings>
<Nullable>enable</Nullable>
<Version>5.19.0</Version>
<Version>5.19.1</Version>
<RepositoryUrl>$(PackageProjectUrl)</RepositoryUrl>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
using IntegrationTests;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.RDBMS;
using Wolverine.RDBMS.Durability;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime;
using Wolverine.SqlServer;
using Wolverine.Tracking;

namespace SqlServerTests;

public class solo_mode_does_not_release_orphaned_messages : IAsyncLifetime
{
private IHost theSoloHost;
private IHost theBalancedHost;
private const string SoloSchema = "solo_orphan";
private const string BalancedSchema = "balanced_orphan";

public async Task InitializeAsync()
{
theSoloHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, SoloSchema);
opts.Durability.Mode = DurabilityMode.Solo;
}).StartAsync();

await theSoloHost.RebuildAllEnvelopeStorageAsync();

theBalancedHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, BalancedSchema);
opts.Durability.Mode = DurabilityMode.Balanced;
}).StartAsync();

await theBalancedHost.RebuildAllEnvelopeStorageAsync();
}

public async Task DisposeAsync()
{
await theSoloHost.StopAsync();
await theBalancedHost.StopAsync();
}

[Fact]
public void solo_mode_build_operation_batch_excludes_orphaned_release()
{
var runtime = theSoloHost.GetRuntime();
var database = (IMessageDatabase)runtime.Storage;

var agent = new DurabilityAgent(runtime, database);

var operations = agent.buildOperationBatch();

// Should NOT contain any orphaned message release operations in Solo mode
operations.ShouldNotContain(op => op is ReleaseOrphanedMessagesOperation);
operations.ShouldNotContain(op => op is ReleaseOrphanedMessagesForAncillaryOperation);
}

[Fact]
public void solo_mode_build_operation_batch_excludes_orphaned_release_even_with_active_nodes()
{
var runtime = theSoloHost.GetRuntime();
var database = (IMessageDatabase)runtime.Storage;

var agent = new DurabilityAgent(runtime, database);

var operations = agent.buildOperationBatch(activeNodeNumbers: new List<int> { 1, 2, 3 });

// Should NOT contain any orphaned message release operations in Solo mode
operations.ShouldNotContain(op => op is ReleaseOrphanedMessagesOperation);
operations.ShouldNotContain(op => op is ReleaseOrphanedMessagesForAncillaryOperation);
}

[Fact]
public void balanced_mode_build_operation_batch_includes_orphaned_release()
{
var runtime = theBalancedHost.GetRuntime();
var database = (IMessageDatabase)runtime.Storage;

var agent = new DurabilityAgent(runtime, database);

var operations = agent.buildOperationBatch();

// SHOULD contain the orphaned message release operation for main databases in Balanced mode
operations.ShouldContain(op => op is ReleaseOrphanedMessagesOperation);
}
}
22 changes: 14 additions & 8 deletions src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Task StartAsync(CancellationToken cancellationToken)
_recoveryTimer = new Timer(async _ =>
{
IReadOnlyList<int>? activeNodeNumbers = null;
if (_database.Settings.Role != MessageStoreRole.Main)
if (_settings.Mode != DurabilityMode.Solo && _database.Settings.Role != MessageStoreRole.Main)
{
try
{
Expand Down Expand Up @@ -170,7 +170,7 @@ private bool isTimeToPruneNodeEventRecords()
return false;
}

private IDatabaseOperation[] buildOperationBatch(IReadOnlyList<int>? activeNodeNumbers = null)
internal IDatabaseOperation[] buildOperationBatch(IReadOnlyList<int>? activeNodeNumbers = null)
{
var incomingTable = new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable);
var now = DateTimeOffset.UtcNow;
Expand All @@ -183,19 +183,25 @@ private IDatabaseOperation[] buildOperationBatch(IReadOnlyList<int>? activeNodeN
new MoveReplayableErrorMessagesToIncomingOperation(_database)
];

if (_database.Settings.Role == MessageStoreRole.Main)
if (_settings.Mode != DurabilityMode.Solo)
{
ops.Add(new ReleaseOrphanedMessagesOperation(_database));
if (_database.Settings.Role == MessageStoreRole.Main)
{
ops.Add(new ReleaseOrphanedMessagesOperation(_database));
}
else if (activeNodeNumbers is { Count: > 0 })
{
ops.Add(new ReleaseOrphanedMessagesForAncillaryOperation(_database, activeNodeNumbers));
}
}

if (_database.Settings.Role == MessageStoreRole.Main)
{
if (isTimeToPruneNodeEventRecords())
{
ops.Add(new DeleteOldNodeEventRecords(_database, _settings));
}
}
else if (activeNodeNumbers is { Count: > 0 })
{
ops.Add(new ReleaseOrphanedMessagesForAncillaryOperation(_database, activeNodeNumbers));
}

if (_runtime.Options.Durability.OutboxStaleTime.HasValue)
{
Expand Down
Loading