diff --git a/docs/guide/durability/index.md b/docs/guide/durability/index.md
index 989e41255..4ff2c57c8 100644
--- a/docs/guide/durability/index.md
+++ b/docs/guide/durability/index.md
@@ -142,7 +142,7 @@ using var host = await Host.CreateDefaultBuilder()
snippet source | anchor
-### Bumping out Stale Outbox Messages
+### Bumping out Stale Inbox/Outbox Messages
It should *not* be possible for there to be any path where a message gets "stuck" in the outbox tables without eventually
being sent by the originating node or recovered by a different node if the original node goes down first. However, it's
@@ -151,12 +151,12 @@ you can "bump" a persisted record in the `wolverine_outgoing_envelopes` to be re
setting the `owner_id` field to zero.
::: info
-Just be aware that opting into the `OutboxStaleTime` threshold will require database changes through Wolverine's database
+Just be aware that opting into the `OutboxStaleTime` or `InboxStaleTime` threshold will require database changes through Wolverine's database
migration subsystem
:::
You also have this setting to force Wolverine to automatically "bump" and older messages that seem to be stalled in
-the outbox table:
+the outbox table or the inbox table:
diff --git a/src/Persistence/PersistenceTests/Samples/DocumentationSamples.cs b/src/Persistence/PersistenceTests/Samples/DocumentationSamples.cs
index 561818c2f..501898f3a 100644
--- a/src/Persistence/PersistenceTests/Samples/DocumentationSamples.cs
+++ b/src/Persistence/PersistenceTests/Samples/DocumentationSamples.cs
@@ -288,6 +288,12 @@ public static async Task options_for_bumping_stale_outbox_messages()
// so that the durability agent can recover it and force
// it to be sent
opts.Durability.OutboxStaleTime = 1.Hours();
+
+ // Same for the inbox, but it's configured independently
+ // This should *never* be necessary and the Wolverine
+ // team has no clue why this could ever happen and a message
+ // could get "stuck", but yet, here this is:
+ opts.Durability.InboxStaleTime = 10.Minutes();
}).StartAsync();
#endregion
diff --git a/src/Persistence/PostgresqlTests/bumping_stale_inbox_messages.cs b/src/Persistence/PostgresqlTests/bumping_stale_inbox_messages.cs
new file mode 100644
index 000000000..c6c98d84f
--- /dev/null
+++ b/src/Persistence/PostgresqlTests/bumping_stale_inbox_messages.cs
@@ -0,0 +1,107 @@
+using IntegrationTests;
+using JasperFx.Core;
+using Microsoft.Extensions.Hosting;
+using Npgsql;
+using Shouldly;
+using Weasel.Core;
+using Weasel.Postgresql.Tables;
+using Wolverine;
+using Wolverine.ComplianceTests;
+using Wolverine.Postgresql;
+using Wolverine.RDBMS;
+using Wolverine.RDBMS.Durability;
+using Wolverine.RDBMS.Polling;
+using Wolverine.Runtime;
+using Wolverine.Tracking;
+
+namespace PostgresqlTests;
+
+public class bumping_stale_inbox_messages : IAsyncLifetime
+{
+ private IHost theHost;
+
+ public async Task InitializeAsync()
+ {
+ theHost = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "stale_outbox");
+ opts.Durability.InboxStaleTime = 1.Hours();
+ }).StartAsync();
+
+ await theHost.RebuildAllEnvelopeStorageAsync();
+ }
+
+ public async Task DisposeAsync()
+ {
+ await theHost.StopAsync();
+ }
+
+ [Fact]
+ public async Task got_the_right_column()
+ {
+ using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
+ await conn.OpenAsync();
+
+ var table = await new Table(new DbObjectName("stale_outbox", DatabaseConstants.IncomingTable)).FetchExistingAsync(conn);
+
+ table.HasColumn(DatabaseConstants.Timestamp).ShouldBeTrue();
+
+ }
+
+ [Fact]
+ public async Task smoke_test_on_the_persist()
+ {
+ var envelope = ObjectMother.Envelope();
+ var messageStore = theHost.GetRuntime().Storage;
+ await messageStore.Inbox.StoreIncomingAsync(envelope);
+ }
+
+ [Fact]
+ public async Task using_the_operation()
+ {
+ var envelope1 = ObjectMother.Envelope();
+ var envelope2 = ObjectMother.Envelope();
+ var envelope3 = ObjectMother.Envelope();
+ var envelope4 = ObjectMother.Envelope();
+ var envelope5 = ObjectMother.Envelope();
+
+
+ var messageStore = theHost.GetRuntime().Storage;
+ await messageStore.Inbox.StoreIncomingAsync(envelope1);
+ await messageStore.Inbox.StoreIncomingAsync(envelope2);
+ await messageStore.Inbox.StoreIncomingAsync(envelope3);
+ await messageStore.Inbox.StoreIncomingAsync(envelope4);
+ await messageStore.Inbox.StoreIncomingAsync(envelope5);
+
+ using var conn = new NpgsqlConnection(Servers.PostgresConnectionString);
+ await conn.OpenAsync();
+ await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set \"timestamp\" = :time where id = :id")
+ .With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
+ .With("id", envelope1.Id)
+ .ExecuteNonQueryAsync();
+
+ await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set \"timestamp\" = :time where id = :id")
+ .With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
+ .With("id", envelope3.Id)
+ .ExecuteNonQueryAsync();
+
+ await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set \"timestamp\" = :time where id = :id")
+ .With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
+ .With("id", envelope5.Id)
+ .ExecuteNonQueryAsync();
+
+ var envelopesBefore = await messageStore.Admin.AllIncomingAsync();
+ envelopesBefore.Count(x => x.OwnerId == 0).ShouldBe(0);
+
+ var operation = new BumpStaleIncomingEnvelopesOperation(
+ new DbObjectName("stale_outbox", DatabaseConstants.IncomingTable), theHost.GetRuntime().Options.Durability, DateTimeOffset.UtcNow);
+
+ var batch = new DatabaseOperationBatch((IMessageDatabase)messageStore, [operation]);
+ await theHost.InvokeAsync(batch);
+
+ var envelopesAfter = await messageStore.Admin.AllIncomingAsync();
+ envelopesAfter.Count(x => x.OwnerId == 0).ShouldBe(3);
+
+ }
+}
\ No newline at end of file
diff --git a/src/Persistence/SqlServerTests/bumping_stale_inbox_messages.cs b/src/Persistence/SqlServerTests/bumping_stale_inbox_messages.cs
new file mode 100644
index 000000000..67e339dc4
--- /dev/null
+++ b/src/Persistence/SqlServerTests/bumping_stale_inbox_messages.cs
@@ -0,0 +1,107 @@
+using IntegrationTests;
+using JasperFx.Core;
+using Microsoft.Data.SqlClient;
+using Microsoft.Extensions.Hosting;
+using Shouldly;
+using Weasel.Core;
+using Weasel.SqlServer.Tables;
+using Wolverine;
+using Wolverine.ComplianceTests;
+using Wolverine.RDBMS;
+using Wolverine.RDBMS.Durability;
+using Wolverine.RDBMS.Polling;
+using Wolverine.Runtime;
+using Wolverine.SqlServer;
+using Wolverine.Tracking;
+
+namespace SqlServerTests;
+
+public class bumping_stale_inbox_messages : IAsyncLifetime
+{
+ private IHost theHost;
+
+ public async Task InitializeAsync()
+ {
+ theHost = await Host.CreateDefaultBuilder()
+ .UseWolverine(opts =>
+ {
+ opts.PersistMessagesWithSqlServer(Servers.SqlServerConnectionString, "stale_outbox");
+ opts.Durability.InboxStaleTime = 1.Hours();
+ }).StartAsync();
+
+ await theHost.RebuildAllEnvelopeStorageAsync();
+ }
+
+ public async Task DisposeAsync()
+ {
+ await theHost.StopAsync();
+ }
+
+ [Fact]
+ public async Task got_the_right_column()
+ {
+ using var conn = new SqlConnection(Servers.SqlServerConnectionString);
+ await conn.OpenAsync();
+
+ var table = await new Table(new DbObjectName("stale_outbox", DatabaseConstants.IncomingTable)).FetchExistingAsync(conn);
+
+ table.HasColumn(DatabaseConstants.Timestamp).ShouldBeTrue();
+
+ }
+
+ [Fact]
+ public async Task smoke_test_on_the_persist()
+ {
+ var envelope = ObjectMother.Envelope();
+ var messageStore = theHost.GetRuntime().Storage;
+ await messageStore.Inbox.StoreIncomingAsync(envelope);
+ }
+
+ [Fact]
+ public async Task using_the_operation()
+ {
+ var envelope1 = ObjectMother.Envelope();
+ var envelope2 = ObjectMother.Envelope();
+ var envelope3 = ObjectMother.Envelope();
+ var envelope4 = ObjectMother.Envelope();
+ var envelope5 = ObjectMother.Envelope();
+
+
+ var messageStore = theHost.GetRuntime().Storage;
+ await messageStore.Inbox.StoreIncomingAsync(envelope1);
+ await messageStore.Inbox.StoreIncomingAsync(envelope2);
+ await messageStore.Inbox.StoreIncomingAsync(envelope3);
+ await messageStore.Inbox.StoreIncomingAsync(envelope4);
+ await messageStore.Inbox.StoreIncomingAsync(envelope5);
+
+ using var conn = new SqlConnection(Servers.SqlServerConnectionString);
+ await conn.OpenAsync();
+ await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set timestamp = @time where id = @id")
+ .With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
+ .With("id", envelope1.Id)
+ .ExecuteNonQueryAsync();
+
+ await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set timestamp = @time where id = @id")
+ .With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
+ .With("id", envelope3.Id)
+ .ExecuteNonQueryAsync();
+
+ await conn.CreateCommand("update stale_outbox.wolverine_incoming_envelopes set timestamp = @time where id = @id")
+ .With("time", DateTimeOffset.UtcNow.Subtract(2.Hours()))
+ .With("id", envelope5.Id)
+ .ExecuteNonQueryAsync();
+
+ var envelopesBefore = await messageStore.Admin.AllIncomingAsync();
+ envelopesBefore.Count(x => x.OwnerId == 0).ShouldBe(0);
+
+ var operation = new BumpStaleIncomingEnvelopesOperation(
+ new DbObjectName("stale_outbox", DatabaseConstants.IncomingTable), theHost.GetRuntime().Options.Durability, DateTimeOffset.UtcNow);
+
+ var batch = new DatabaseOperationBatch((IMessageDatabase)messageStore, [operation]);
+ await theHost.InvokeAsync(batch);
+
+ var envelopesAfter = await messageStore.Admin.AllIncomingAsync();
+ envelopesAfter.Count(x => x.OwnerId == 0).ShouldBe(3);
+
+ }
+}
\ No newline at end of file
diff --git a/src/Persistence/Wolverine.Postgresql/Schema/IncomingEnvelopeTable.cs b/src/Persistence/Wolverine.Postgresql/Schema/IncomingEnvelopeTable.cs
index 8d1ccd295..e1832556e 100644
--- a/src/Persistence/Wolverine.Postgresql/Schema/IncomingEnvelopeTable.cs
+++ b/src/Persistence/Wolverine.Postgresql/Schema/IncomingEnvelopeTable.cs
@@ -29,6 +29,11 @@ public IncomingEnvelopeTable(DurabilitySettings durability, string schemaName) :
AddColumn(DatabaseConstants.KeepUntil);
+
+ if (durability.InboxStaleTime.HasValue)
+ {
+ AddColumn(DatabaseConstants.Timestamp).DefaultValueByExpression("(now() at time zone 'utc')");
+ }
if (durability.EnableInboxPartitioning)
{
@@ -38,5 +43,7 @@ public IncomingEnvelopeTable(DurabilitySettings durability, string schemaName) :
.AddPartition("scheduled", EnvelopeStatus.Scheduled.ToString())
.AddPartition("handled", EnvelopeStatus.Handled.ToString());
}
+
+
}
}
\ No newline at end of file
diff --git a/src/Persistence/Wolverine.RDBMS/Durability/BumpStaleIncomingEnvelopesOperation.cs b/src/Persistence/Wolverine.RDBMS/Durability/BumpStaleIncomingEnvelopesOperation.cs
new file mode 100644
index 000000000..e9bf7165f
--- /dev/null
+++ b/src/Persistence/Wolverine.RDBMS/Durability/BumpStaleIncomingEnvelopesOperation.cs
@@ -0,0 +1,41 @@
+using System.Data.Common;
+using Weasel.Core;
+using Wolverine.RDBMS.Polling;
+using Wolverine.Runtime.Agents;
+using DbCommandBuilder = Weasel.Core.DbCommandBuilder;
+
+namespace Wolverine.RDBMS.Durability;
+
+internal class BumpStaleIncomingEnvelopesOperation : IDatabaseOperation, IDoNotReturnData
+{
+ private readonly DbObjectName _incomingTable;
+ private readonly DurabilitySettings _durability;
+ private readonly DateTimeOffset _timestamp;
+
+ public BumpStaleIncomingEnvelopesOperation(DbObjectName incomingTable, DurabilitySettings durability, DateTimeOffset utcNow)
+ {
+ _incomingTable = incomingTable;
+ _durability = durability;
+ _timestamp = utcNow.Subtract(_durability.InboxStaleTime.Value);
+ }
+
+ public string Description => "Bump stale or stuck inbox messages to be picked up by other nodes";
+
+ public void ConfigureCommand(DbCommandBuilder builder)
+ {
+ builder.Append(
+ $"update {_incomingTable} set {DatabaseConstants.OwnerId} = 0 where {DatabaseConstants.OwnerId} != 0 and {DatabaseConstants.Timestamp} <= ");
+ builder.AppendParameter(_timestamp);
+ builder.Append(';');
+ }
+
+ public Task ReadResultsAsync(DbDataReader reader, IList exceptions, CancellationToken token)
+ {
+ return Task.CompletedTask;
+ }
+
+ public IEnumerable PostProcessingCommands()
+ {
+ yield break;
+ }
+}
\ No newline at end of file
diff --git a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
index 728f201d7..88760759f 100644
--- a/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
+++ b/src/Persistence/Wolverine.RDBMS/DurabilityAgent.cs
@@ -179,6 +179,11 @@ private IDatabaseOperation[] buildOperationBatch()
ops.Add(new BumpStaleOutgoingEnvelopesOperation(new DbObjectName(_database.SchemaName, DatabaseConstants.OutgoingTable), _runtime.Options.Durability, DateTimeOffset.UtcNow));
}
+ if (_runtime.Options.Durability.InboxStaleTime.HasValue)
+ {
+ ops.Add(new BumpStaleIncomingEnvelopesOperation(new DbObjectName(_database.SchemaName, DatabaseConstants.IncomingTable), _runtime.Options.Durability, DateTimeOffset.UtcNow));
+ }
+
return ops.ToArray();
}
diff --git a/src/Persistence/Wolverine.SqlServer/Schema/IncomingEnvelopeTable.cs b/src/Persistence/Wolverine.SqlServer/Schema/IncomingEnvelopeTable.cs
index 852e57b03..aa36945a0 100644
--- a/src/Persistence/Wolverine.SqlServer/Schema/IncomingEnvelopeTable.cs
+++ b/src/Persistence/Wolverine.SqlServer/Schema/IncomingEnvelopeTable.cs
@@ -28,5 +28,10 @@ public IncomingEnvelopeTable(DurabilitySettings durability, string schemaName) :
}
AddColumn(DatabaseConstants.KeepUntil);
+
+ if (durability.InboxStaleTime.HasValue)
+ {
+ AddColumn(DatabaseConstants.Timestamp).DefaultValueByExpression("GETUTCDATE()");
+ }
}
}
\ No newline at end of file
diff --git a/src/Wolverine/DurabilitySettings.cs b/src/Wolverine/DurabilitySettings.cs
index 295704b98..0e5cd8f04 100644
--- a/src/Wolverine/DurabilitySettings.cs
+++ b/src/Wolverine/DurabilitySettings.cs
@@ -98,6 +98,13 @@ internal set
///
public TimeSpan? OutboxStaleTime { get; set; }
+ ///
+ /// If non-null, this directs Wolverine to "push" any message in the durable inbox that is older
+ /// than the configured time even if the message is marked as owned by an active node. Should NOT ever
+ /// be necessary, but it's an imperfect world. Enable this if you see "stuck" envelopes
+ ///
+ public TimeSpan? InboxStaleTime { get; set; }
+
///
/// For persistence mechanisms that support this (PostgreSQL), this directs Wolverine to use partitioning
/// based on the envelope status for the transactional inbox storage. This can be a performance optimization,