Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/guide/durability/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ using var host = await Host.CreateDefaultBuilder()
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/PersistenceTests/Samples/DocumentationSamples.cs#L53-L63' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_make_all_subscribers_be_durable' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

### Bumping out Stale Outbox Messages <Badge type="tip" text="5.2" />
### Bumping out Stale Inbox/Outbox Messages <Badge type="tip" text="5.2" />

It should *not* be possible for there to be any path where a message gets "stuck" in the outbox tables without eventually
being sent by the originating node or recovered by a different node if the original node goes down first. However, it's
Expand All @@ -151,12 +151,12 @@ you can "bump" a persisted record in the `wolverine_outgoing_envelopes` to be re
setting the `owner_id` field to zero.

::: info
Just be aware that opting into the `OutboxStaleTime` threshold will require database changes through Wolverine's database
Just be aware that opting into the `OutboxStaleTime` or `InboxStaleTime` threshold will require database changes through Wolverine's database
migration subsystem
:::

You also have this setting to force Wolverine to automatically "bump" and older messages that seem to be stalled in
the outbox table:
the outbox table or the inbox table:

<!-- snippet: sample_configuring_outbox_stale_timeout -->
<a id='snippet-sample_configuring_outbox_stale_timeout'></a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,12 @@ public static async Task options_for_bumping_stale_outbox_messages()
// so that the durability agent can recover it and force
// it to be sent
opts.Durability.OutboxStaleTime = 1.Hours();

// Same for the inbox, but it's configured independently
// This should *never* be necessary and the Wolverine
// team has no clue why this could ever happen and a message
// could get "stuck", but yet, here this is:
opts.Durability.InboxStaleTime = 10.Minutes();
}).StartAsync();

#endregion
Expand Down
107 changes: 107 additions & 0 deletions src/Persistence/PostgresqlTests/bumping_stale_inbox_messages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using IntegrationTests;
using JasperFx.Core;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Shouldly;
using Weasel.Core;
using Weasel.Postgresql.Tables;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.Postgresql;
using Wolverine.RDBMS;
using Wolverine.RDBMS.Durability;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime;
using Wolverine.Tracking;

namespace PostgresqlTests;

public class bumping_stale_inbox_messages : IAsyncLifetime
{
private IHost theHost;

public async Task InitializeAsync()
{
theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "stale_outbox");
opts.Durability.InboxStaleTime = 1.Hours();
}).StartAsync();

await theHost.RebuildAllEnvelopeStorageAsync();
}

public async Task DisposeAsync()
{
await theHost.StopAsync();
}

[Fact]
public async Task got_the_right_column()
{
using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();

var table = await new Table(new DbObjectName("stale_outbox", DatabaseConstants.IncomingTable)).FetchExistingAsync(conn);

table.HasColumn(DatabaseConstants.Timestamp).ShouldBeTrue();

}

[Fact]
public async Task smoke_test_on_the_persist()
{
var envelope = ObjectMother.Envelope();
var messageStore = theHost.GetRuntime().Storage;
await messageStore.Inbox.StoreIncomingAsync(envelope);
}

[Fact]
public async Task using_the_operation()
{
var envelope1 = ObjectMother.Envelope();
var envelope2 = ObjectMother.Envelope();
var envelope3 = ObjectMother.Envelope();
var envelope4 = ObjectMother.Envelope();
var envelope5 = ObjectMother.Envelope();


var messageStore = theHost.GetRuntime().Storage;
await messageStore.Inbox.StoreIncomingAsync(envelope1);
await messageStore.Inbox.StoreIncomingAsync(envelope2);
await messageStore.Inbox.StoreIncomingAsync(envelope3);
await messageStore.Inbox.StoreIncomingAsync(envelope4);
await messageStore.Inbox.StoreIncomingAsync(envelope5);

using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
await conn.OpenAsync();
await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set \"timestamp\" = :time where id = :id")
.With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
.With("id", envelope1.Id)
.ExecuteNonQueryAsync();

await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set \"timestamp\" = :time where id = :id")
.With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
.With("id", envelope3.Id)
.ExecuteNonQueryAsync();

await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set \"timestamp\" = :time where id = :id")
.With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
.With("id", envelope5.Id)
.ExecuteNonQueryAsync();

var envelopesBefore = await messageStore.Admin.AllIncomingAsync();
envelopesBefore.Count(x => x.OwnerId == 0).ShouldBe(0);

var operation = new BumpStaleIncomingEnvelopesOperation(
new DbObjectName("stale_outbox", DatabaseConstants.IncomingTable), theHost.GetRuntime().Options.Durability, DateTimeOffset.UtcNow);

var batch = new DatabaseOperationBatch((IMessageDatabase)messageStore, [operation]);
await theHost.InvokeAsync(batch);

var envelopesAfter = await messageStore.Admin.AllIncomingAsync();
envelopesAfter.Count(x => x.OwnerId == 0).ShouldBe(3);

}
}
107 changes: 107 additions & 0 deletions src/Persistence/SqlServerTests/bumping_stale_inbox_messages.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
using IntegrationTests;
using JasperFx.Core;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.Hosting;
using Shouldly;
using Weasel.Core;
using Weasel.SqlServer.Tables;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.RDBMS;
using Wolverine.RDBMS.Durability;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime;
using Wolverine.SqlServer;
using Wolverine.Tracking;

namespace SqlServerTests;

public class bumping_stale_inbox_messages : IAsyncLifetime
{
private IHost theHost;

public async Task InitializeAsync()
{
theHost = await Host.CreateDefaultBuilder()
.UseWolverine(opts =>
{
opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "stale_outbox");
opts.Durability.InboxStaleTime = 1.Hours();
}).StartAsync();

await theHost.RebuildAllEnvelopeStorageAsync();
}

public async Task DisposeAsync()
{
await theHost.StopAsync();
}

[Fact]
public async Task got_the_right_column()
{
using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();

var table = await new Table(new DbObjectName("stale_outbox", DatabaseConstants.IncomingTable)).FetchExistingAsync(conn);

table.HasColumn(DatabaseConstants.Timestamp).ShouldBeTrue();

}

[Fact]
public async Task smoke_test_on_the_persist()
{
var envelope = ObjectMother.Envelope();
var messageStore = theHost.GetRuntime().Storage;
await messageStore.Inbox.StoreIncomingAsync(envelope);
}

[Fact]
public async Task using_the_operation()
{
var envelope1 = ObjectMother.Envelope();
var envelope2 = ObjectMother.Envelope();
var envelope3 = ObjectMother.Envelope();
var envelope4 = ObjectMother.Envelope();
var envelope5 = ObjectMother.Envelope();


var messageStore = theHost.GetRuntime().Storage;
await messageStore.Inbox.StoreIncomingAsync(envelope1);
await messageStore.Inbox.StoreIncomingAsync(envelope2);
await messageStore.Inbox.StoreIncomingAsync(envelope3);
await messageStore.Inbox.StoreIncomingAsync(envelope4);
await messageStore.Inbox.StoreIncomingAsync(envelope5);

using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();
await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set timestamp = @time where id = @id")
.With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
.With("id", envelope1.Id)
.ExecuteNonQueryAsync();

await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set timestamp = @time where id = @id")
.With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
.With("id", envelope3.Id)
.ExecuteNonQueryAsync();

await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set timestamp = @time where id = @id")
.With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
.With("id", envelope5.Id)
.ExecuteNonQueryAsync();

var envelopesBefore = await messageStore.Admin.AllIncomingAsync();
envelopesBefore.Count(x => x.OwnerId == 0).ShouldBe(0);

var operation = new BumpStaleIncomingEnvelopesOperation(
new DbObjectName("stale_outbox", DatabaseConstants.IncomingTable), theHost.GetRuntime().Options.Durability, DateTimeOffset.UtcNow);

var batch = new DatabaseOperationBatch((IMessageDatabase)messageStore, [operation]);
await theHost.InvokeAsync(batch);

var envelopesAfter = await messageStore.Admin.AllIncomingAsync();
envelopesAfter.Count(x => x.OwnerId == 0).ShouldBe(3);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public IncomingEnvelopeTable(DurabilitySettings durability, string schemaName) :


AddColumn<DateTimeOffset>(DatabaseConstants.KeepUntil);

if (durability.InboxStaleTime.HasValue)
{
AddColumn<DateTimeOffset>(DatabaseConstants.Timestamp).DefaultValueByExpression("(now() at time zone 'utc')");
}

if (durability.EnableInboxPartitioning)
{
Expand All @@ -38,5 +43,7 @@ public IncomingEnvelopeTable(DurabilitySettings durability, string schemaName) :
.AddPartition("scheduled", EnvelopeStatus.Scheduled.ToString())
.AddPartition("handled", EnvelopeStatus.Handled.ToString());
}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System.Data.Common;
using Weasel.Core;
using Wolverine.RDBMS.Polling;
using Wolverine.Runtime.Agents;
using DbCommandBuilder = Weasel.Core.DbCommandBuilder;

namespace Wolverine.RDBMS.Durability;

internal class BumpStaleIncomingEnvelopesOperation : IDatabaseOperation, IDoNotReturnData
{
private readonly DbObjectName _incomingTable;
private readonly DurabilitySettings _durability;
private readonly DateTimeOffset _timestamp;

public BumpStaleIncomingEnvelopesOperation(DbObjectName incomingTable, DurabilitySettings durability, DateTimeOffset utcNow)
{
_incomingTable = incomingTable;
_durability = durability;
_timestamp = utcNow.Subtract(_durability.InboxStaleTime.Value);
}

public string Description => "Bump stale or stuck inbox messages to be picked up by other nodes";

public void ConfigureCommand(DbCommandBuilder builder)
{
builder.Append(
$"update {_incomingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.Timestamp} <= ");
builder.AppendParameter(_timestamp);
builder.Append(';');
}

public Task ReadResultsAsync(DbDataReader reader, IList<Exception> exceptions, CancellationToken token)
{
return Task.CompletedTask;
}

public IEnumerable<IAgentCommand> PostProcessingCommands()
{
yield break;
}
}
5 changes: 5 additions & 0 deletions src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ private IDatabaseOperation[] buildOperationBatch()
ops.Add(new BumpStaleOutgoingEnvelopesOperation(new DbObjectName(_database.SchemaName, DatabaseConstants.OutgoingTable), _runtime.Options.Durability, DateTimeOffset.UtcNow));
}

if (_runtime.Options.Durability.InboxStaleTime.HasValue)
{
ops.Add(new BumpStaleIncomingEnvelopesOperation(new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), _runtime.Options.Durability, DateTimeOffset.UtcNow));
}

return ops.ToArray();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@ public IncomingEnvelopeTable(DurabilitySettings durability, string schemaName) :
}

AddColumn<DateTimeOffset>(DatabaseConstants.KeepUntil);

if (durability.InboxStaleTime.HasValue)
{
AddColumn<DateTimeOffset>(DatabaseConstants.Timestamp).DefaultValueByExpression("GETUTCDATE()");
}
}
}
7 changes: 7 additions & 0 deletions src/Wolverine/DurabilitySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ internal set
/// </summary>
public TimeSpan? OutboxStaleTime { get; set; }

/// <summary>
/// If non-null, this directs Wolverine to "push" any message in the durable inbox that is older
/// than the configured time even if the message is marked as owned by an active node. Should NOT ever
/// be necessary, but it's an imperfect world. Enable this if you see "stuck" envelopes
/// </summary>
public TimeSpan? InboxStaleTime { get; set; }

/// <summary>
/// For persistence mechanisms that support this (PostgreSQL), this directs Wolverine to use partitioning
/// based on the envelope status for the transactional inbox storage. This can be a performance optimization,
Expand Down
Loading