From 60c928e121ad3b488cc9054e2bee93668bfb8fee Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 1 Dec 2025 08:54:16 -0600 Subject: [PATCH 1/3] Envelope.KeepUntil & round trip serialization --- .../serialization_and_deserialization.cs | 9 ++++++- src/Wolverine/Envelope.cs | 6 +++++ src/Wolverine/EnvelopeConstants.cs | 1 + .../Serialization/EnvelopeSerializer.cs | 27 +++++++++++++++++++ 4 files changed, 42 insertions(+), 1 deletion(-) diff --git a/src/Testing/CoreTests/Serialization/serialization_and_deserialization.cs b/src/Testing/CoreTests/Serialization/serialization_and_deserialization.cs index f4e25d3f5..688321ba2 100644 --- a/src/Testing/CoreTests/Serialization/serialization_and_deserialization.cs +++ b/src/Testing/CoreTests/Serialization/serialization_and_deserialization.cs @@ -20,7 +20,8 @@ public serialization_and_deserialization_of_single_message() DeliverBy = DateTime.Today.ToUniversalTime(), ReplyUri = "tcp://localhost:2221/replies".ToUri(), SagaId = Guid.NewGuid().ToString(), - ParentId = Guid.NewGuid().ToString() + ParentId = Guid.NewGuid().ToString(), + KeepUntil = DateTime.Today.AddDays(1).ToUniversalTime() }; outgoing.Headers.Add("name", "Jeremy"); @@ -222,4 +223,10 @@ public void partition_key() outgoing.PartitionKey = Guid.NewGuid().ToString(); incoming.PartitionKey.ShouldBe(outgoing.PartitionKey); } + + [Fact] + public void keep_until() + { + incoming.KeepUntil.Value.ShouldBe(outgoing.KeepUntil.Value); + } } \ No newline at end of file diff --git a/src/Wolverine/Envelope.cs b/src/Wolverine/Envelope.cs index 7cd383a15..685ec9068 100644 --- a/src/Wolverine/Envelope.cs +++ b/src/Wolverine/Envelope.cs @@ -471,4 +471,10 @@ public static Envelope ForPersistedHandled(Envelope original) MessageType = original.MessageType, }; } + + /// + /// Marks the time stamp for how long this envelope should be retained as + /// "Handled" in the inbox for idempotency protections + /// + public DateTimeOffset? KeepUntil { get; set; } } \ No newline at end of file diff --git a/src/Wolverine/EnvelopeConstants.cs b/src/Wolverine/EnvelopeConstants.cs index 788ce6518..6f457f626 100644 --- a/src/Wolverine/EnvelopeConstants.cs +++ b/src/Wolverine/EnvelopeConstants.cs @@ -25,4 +25,5 @@ public static class EnvelopeConstants public const string GroupIdKey = "group-id"; public const string PartitionKey = "partition-key"; public const string TopicNameKey = "topic-name"; + public const string KeepUntilKey = "keep-until"; } \ No newline at end of file diff --git a/src/Wolverine/Runtime/Serialization/EnvelopeSerializer.cs b/src/Wolverine/Runtime/Serialization/EnvelopeSerializer.cs index 235843d6e..c338d9329 100644 --- a/src/Wolverine/Runtime/Serialization/EnvelopeSerializer.cs +++ b/src/Wolverine/Runtime/Serialization/EnvelopeSerializer.cs @@ -95,6 +95,23 @@ public static void ReadDataElement(Envelope env, string key, string value) } } break; + + case EnvelopeConstants.KeepUntilKey: + // Don't read it twice + if (env.KeepUntil.HasValue) return; + + try + { + env.KeepUntil = XmlConvert.ToDateTime(value, XmlDateTimeSerializationMode.Utc); + } + catch (Exception ) + { + if (DateTimeOffset.TryParse(value, out var dt)) + { + env.KeepUntil = dt; + } + } + break; case EnvelopeConstants.AttemptsKey: env.Attempts = int.Parse(value); @@ -242,6 +259,7 @@ private static int writeHeaders(BinaryWriter writer, Envelope env) writer.WriteProp(ref count, EnvelopeConstants.ParentIdKey, env.ParentId); writer.WriteProp(ref count, EnvelopeConstants.TenantIdKey, env.TenantId); writer.WriteProp(ref count, EnvelopeConstants.TopicNameKey, env.TopicName); + if (env.AcceptedContentTypes.Length != 0) { @@ -265,6 +283,15 @@ private static int writeHeaders(BinaryWriter writer, Envelope env) writer.Write(dateString); } + if (env.KeepUntil.HasValue) + { + var dateString = env.KeepUntil.Value.ToUniversalTime() + .ToString("yyyy-MM-ddTHH:mm:ss.fffffff", CultureInfo.InvariantCulture); + count++; + writer.Write(EnvelopeConstants.KeepUntilKey); + writer.Write(dateString); + } + writer.WriteProp(ref count, EnvelopeConstants.AttemptsKey, env.Attempts); writer.WriteProp(ref count, EnvelopeConstants.DeliverByKey, env.DeliverBy); From 31397c0fcd8960feaec908cbf7554db49dd55877 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 1 Dec 2025 09:53:30 -0600 Subject: [PATCH 2/3] Assigning Envelope.KeepUntil in the ForPersistedHandled --- ...cy_with_non_wolverine_mapped_db_context.cs | 7 +-- ...dd_dbcontext_with_wolverine_integration.cs | 7 +-- ...cy_check_in_marten_envelope_transaction.cs | 8 ++- .../Internals/EfCoreEnvelopeTransaction.cs | 9 ++-- .../FlushOutgoingMessagesOnCommit.cs | 2 +- .../MartenEnvelopeTransaction.cs | 5 +- .../DatabaseEnvelopeTransaction.cs | 5 +- .../Internals/RavenDbEnvelopeTransaction.cs | 5 +- src/Testing/CoreTests/EnvelopeTests.cs | 52 +++++++++++++++++++ src/Wolverine/Envelope.cs | 4 +- .../Durability/IEnvelopeTransaction.cs | 3 +- src/Wolverine/Runtime/MessageContext.cs | 5 +- 12 files changed, 85 insertions(+), 27 deletions(-) diff --git a/src/Persistence/EfCoreTests/eager_idempotency_with_non_wolverine_mapped_db_context.cs b/src/Persistence/EfCoreTests/eager_idempotency_with_non_wolverine_mapped_db_context.cs index 69d542e3e..314993e62 100644 --- a/src/Persistence/EfCoreTests/eager_idempotency_with_non_wolverine_mapped_db_context.cs +++ b/src/Persistence/EfCoreTests/eager_idempotency_with_non_wolverine_mapped_db_context.cs @@ -39,7 +39,7 @@ public async Task happy_path_eager_idempotency() var transaction = new EfCoreEnvelopeTransaction(dbContext, context); - var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None); ok.ShouldBeTrue(); await dbContext.Database.CurrentTransaction!.CommitAsync(); @@ -66,14 +66,15 @@ public async Task sad_path_eager_idempotency() var transaction = new EfCoreEnvelopeTransaction(dbContext, context); - var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var durabilitySettings = new DurabilitySettings(); + var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, durabilitySettings, CancellationToken.None); ok.ShouldBeTrue(); await dbContext.Database.CurrentTransaction!.CommitAsync(); // Kind of resetting it here envelope.WasPersistedInInbox = false; - var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, durabilitySettings, CancellationToken.None); secondTime.ShouldBeFalse(); diff --git a/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs b/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs index 95ded15f2..54d4f3dbf 100644 --- a/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs +++ b/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs @@ -73,7 +73,7 @@ public async Task happy_path_eager_idempotency() var transaction = new EfCoreEnvelopeTransaction(dbContext, context); - var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None); ok.ShouldBeTrue(); await dbContext.Database.CurrentTransaction!.CommitAsync(); @@ -102,14 +102,15 @@ public async Task sad_path_eager_idempotency() var transaction = new EfCoreEnvelopeTransaction(dbContext, context); - var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var durabilitySettings = new DurabilitySettings(); + var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, durabilitySettings, CancellationToken.None); ok.ShouldBeTrue(); await dbContext.Database.CurrentTransaction!.CommitAsync(); // Kind of resetting it here envelope.WasPersistedInInbox = false; - var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, durabilitySettings, CancellationToken.None); secondTime.ShouldBeFalse(); diff --git a/src/Persistence/MartenTests/idempotency_check_in_marten_envelope_transaction.cs b/src/Persistence/MartenTests/idempotency_check_in_marten_envelope_transaction.cs index fd1d51f49..3751544b6 100644 --- a/src/Persistence/MartenTests/idempotency_check_in_marten_envelope_transaction.cs +++ b/src/Persistence/MartenTests/idempotency_check_in_marten_envelope_transaction.cs @@ -53,7 +53,7 @@ public async Task happy_path_eager_idempotency() using var session = _host.DocumentStore().LightweightSession(); var transaction = new MartenEnvelopeTransaction(session, context); - var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None); ok.ShouldBeTrue(); var persisted = (await runtime.Storage.Admin.AllIncomingAsync()).Single(x => x.Id == envelope.Id); @@ -77,16 +77,14 @@ public async Task sad_path_eager_idempotency() using var session = _host.DocumentStore().LightweightSession(); var transaction = new MartenEnvelopeTransaction(session, context); - var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None); ok.ShouldBeTrue(); // Kind of resetting it here envelope.WasPersistedInInbox = false; - var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None); + var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None); secondTime.ShouldBeFalse(); - - } } diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/EfCoreEnvelopeTransaction.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/EfCoreEnvelopeTransaction.cs index 348caa01c..585d3ace1 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/EfCoreEnvelopeTransaction.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/EfCoreEnvelopeTransaction.cs @@ -17,7 +17,7 @@ public class EfCoreEnvelopeTransaction : IEnvelopeTransaction { private readonly MessageContext _messaging; private readonly IMessageDatabase _database; - + public EfCoreEnvelopeTransaction(DbContext dbContext, MessageContext messaging) { _messaging = messaging; @@ -125,7 +125,8 @@ public ValueTask RollbackAsync() return ValueTask.CompletedTask; } - public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation) + public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings, + CancellationToken cancellation) { if (envelope.WasPersistedInInbox) return true; @@ -136,7 +137,7 @@ public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, Can try { - var copy = Envelope.ForPersistedHandled(envelope); + var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings); await PersistIncomingAsync(copy); // Gotta flush the call to the database! @@ -181,7 +182,7 @@ public async ValueTask CommitAsync(CancellationToken cancellation) // handled messages for the sake of idempotency else { - var envelope = Envelope.ForPersistedHandled(_messaging.Envelope); + var envelope = Envelope.ForPersistedHandled(_messaging.Envelope, DateTimeOffset.UtcNow, _messaging.Runtime.Options.Durability); await PersistIncomingAsync(envelope); } diff --git a/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs b/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs index 52d5640fd..b544eac44 100644 --- a/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs +++ b/src/Persistence/Wolverine.Marten/FlushOutgoingMessagesOnCommit.cs @@ -35,7 +35,7 @@ public override Task BeforeSaveChangesAsync(IDocumentSession session, Cancellati } else { - var envelope = Envelope.ForPersistedHandled(_context.Envelope); + var envelope = Envelope.ForPersistedHandled(_context.Envelope, DateTimeOffset.UtcNow, _context.Runtime.Options.Durability); session.QueueOperation(new StoreIncomingEnvelope(_messageStore.IncomingFullName, envelope)); } diff --git a/src/Persistence/Wolverine.Marten/MartenEnvelopeTransaction.cs b/src/Persistence/Wolverine.Marten/MartenEnvelopeTransaction.cs index f74a83880..798f85912 100644 --- a/src/Persistence/Wolverine.Marten/MartenEnvelopeTransaction.cs +++ b/src/Persistence/Wolverine.Marten/MartenEnvelopeTransaction.cs @@ -63,7 +63,8 @@ public ValueTask RollbackAsync() return ValueTask.CompletedTask; } - public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation) + public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings, + CancellationToken cancellation) { if (envelope.WasPersistedInInbox) return true; @@ -71,7 +72,7 @@ public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, Can { // Might need to reset! _context.MultiFlushMode = MultiFlushMode.AllowMultiples; - var copy = Envelope.ForPersistedHandled(envelope); + var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings); await PersistIncomingAsync(copy); await Session.SaveChangesAsync(cancellation); diff --git a/src/Persistence/Wolverine.RDBMS/DatabaseEnvelopeTransaction.cs b/src/Persistence/Wolverine.RDBMS/DatabaseEnvelopeTransaction.cs index ba9416338..a4c3640fb 100644 --- a/src/Persistence/Wolverine.RDBMS/DatabaseEnvelopeTransaction.cs +++ b/src/Persistence/Wolverine.RDBMS/DatabaseEnvelopeTransaction.cs @@ -40,9 +40,10 @@ public Task PersistIncomingAsync(Envelope envelope) return _database.StoreIncomingAsync(_tx, [envelope]); } - public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation) + public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings, + CancellationToken cancellation) { - var copy = Envelope.ForPersistedHandled(envelope); + var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings); try { await PersistIncomingAsync(copy); diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbEnvelopeTransaction.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbEnvelopeTransaction.cs index 0d9701293..00a0c1e13 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbEnvelopeTransaction.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbEnvelopeTransaction.cs @@ -25,9 +25,10 @@ public RavenDbEnvelopeTransaction(IAsyncDocumentSession session, MessageContext Session = session; } - public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation) + public async Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings, + CancellationToken cancellation) { - var copy = Envelope.ForPersistedHandled(envelope); + var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings); try { await PersistIncomingAsync(copy); diff --git a/src/Testing/CoreTests/EnvelopeTests.cs b/src/Testing/CoreTests/EnvelopeTests.cs index d49b102a4..42facb908 100644 --- a/src/Testing/CoreTests/EnvelopeTests.cs +++ b/src/Testing/CoreTests/EnvelopeTests.cs @@ -569,4 +569,56 @@ public void the_content_type_should_be_binary_envelope() theScheduledEnvelope.ContentType.ShouldBe(TransportConstants.SerializedEnvelope); } } + + public class when_building_an_envelope_for_persisted_handled + { + private readonly Envelope theOriginal; + private readonly DateTimeOffset now; + private readonly DurabilitySettings theSettings; + private readonly Envelope theHandledEnvelope; + + public when_building_an_envelope_for_persisted_handled() + { + theOriginal = ObjectMother.Envelope(); + theOriginal.Status = EnvelopeStatus.Incoming; + + now = DateTime.Today.ToUniversalTime(); + theSettings = new DurabilitySettings + { + KeepAfterMessageHandling = 5.Minutes() + }; + + theHandledEnvelope = Envelope.ForPersistedHandled(theOriginal, now, theSettings); + } + + [Fact] + public void status_should_be_handled() + { + theHandledEnvelope.Status.ShouldBe(EnvelopeStatus.Handled); + } + + [Fact] + public void keep_until_should_be_set() + { + theHandledEnvelope.KeepUntil.Value.ShouldBe(now.AddMinutes(5)); + } + + [Fact] + public void owner_is_any_node() + { + theHandledEnvelope.OwnerId.ShouldBe(0); + } + + [Fact] + public void destination() + { + theHandledEnvelope.Destination.ShouldBe(theOriginal.Destination); + } + + [Fact] + public void message_type() + { + theHandledEnvelope.MessageType.ShouldBe(theOriginal.MessageType); + } + } } \ No newline at end of file diff --git a/src/Wolverine/Envelope.cs b/src/Wolverine/Envelope.cs index 685ec9068..5333a783d 100644 --- a/src/Wolverine/Envelope.cs +++ b/src/Wolverine/Envelope.cs @@ -458,7 +458,7 @@ internal string GetMessageTypeName() /// internal IMessageStore? Store { get; set; } - public static Envelope ForPersistedHandled(Envelope original) + public static Envelope ForPersistedHandled(Envelope original, DateTimeOffset now, DurabilitySettings settings) { return new Envelope { @@ -467,8 +467,8 @@ public static Envelope ForPersistedHandled(Envelope original) OwnerId = 0, Status = EnvelopeStatus.Handled, Destination = original.Destination, - ScheduledTime = DateTimeOffset.UtcNow, MessageType = original.MessageType, + KeepUntil = now.Add(settings.KeepAfterMessageHandling) }; } diff --git a/src/Wolverine/Persistence/Durability/IEnvelopeTransaction.cs b/src/Wolverine/Persistence/Durability/IEnvelopeTransaction.cs index 0550e9a40..4f3920640 100644 --- a/src/Wolverine/Persistence/Durability/IEnvelopeTransaction.cs +++ b/src/Wolverine/Persistence/Durability/IEnvelopeTransaction.cs @@ -10,7 +10,8 @@ public interface IEnvelopeTransaction ValueTask RollbackAsync(); - Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation); + Task TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings, + CancellationToken cancellation); } public static class EnvelopeTransactionExtensions diff --git a/src/Wolverine/Runtime/MessageContext.cs b/src/Wolverine/Runtime/MessageContext.cs index 3895c3992..100f39ae5 100644 --- a/src/Wolverine/Runtime/MessageContext.cs +++ b/src/Wolverine/Runtime/MessageContext.cs @@ -48,7 +48,8 @@ public MessageContext(IWolverineRuntime runtime, string tenantId) : base(runtime TenantId = runtime.Options.Durability.TenantIdStyle.MaybeCorrectTenantId(tenantId); } - Task IEnvelopeTransaction.TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation) + Task IEnvelopeTransaction.TryMakeEagerIdempotencyCheckAsync(Envelope envelope, + DurabilitySettings settings, CancellationToken cancellation) { return Task.FromResult(true); } @@ -81,7 +82,7 @@ public async Task AssertEagerIdempotencyAsync(CancellationToken cancellation) if (Envelope == null || Envelope.WasPersistedInInbox ) return; if (Transaction == null) return; - var check = await Transaction.TryMakeEagerIdempotencyCheckAsync(Envelope, cancellation); + var check = await Transaction.TryMakeEagerIdempotencyCheckAsync(Envelope, Runtime.Options.Durability, cancellation); if (!check) { throw new DuplicateIncomingEnvelopeException(Envelope); From fd8b6642cd17a3d40e35725675fc81a74ef1bddc Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Mon, 1 Dec 2025 10:16:58 -0600 Subject: [PATCH 3/3] Correctly persisting KeepUntil with the new idempotency checks. Closes GH-1876 --- ...tency_with_non_wolverine_mapped_db_context.cs | 16 ++++++++++++++++ ...g_add_dbcontext_with_wolverine_integration.cs | 16 +++++++++++++--- ...tency_check_in_marten_envelope_transaction.cs | 1 + .../Internals/IncomingMessage.cs | 5 ++++- .../WolverineEntityCoreExtensions.cs | 1 + .../Operations/StoreIncomingEnvelope.cs | 2 ++ .../Wolverine.RDBMS/DatabaseConstants.cs | 2 +- .../Wolverine.RDBMS/DatabasePersistence.cs | 8 +++++++- 8 files changed, 45 insertions(+), 6 deletions(-) diff --git a/src/Persistence/EfCoreTests/eager_idempotency_with_non_wolverine_mapped_db_context.cs b/src/Persistence/EfCoreTests/eager_idempotency_with_non_wolverine_mapped_db_context.cs index 314993e62..c4631b83f 100644 --- a/src/Persistence/EfCoreTests/eager_idempotency_with_non_wolverine_mapped_db_context.cs +++ b/src/Persistence/EfCoreTests/eager_idempotency_with_non_wolverine_mapped_db_context.cs @@ -1,12 +1,16 @@ +using IntegrationTests; +using Microsoft.Data.SqlClient; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using NSubstitute; using SharedPersistenceModels.Items; using Shouldly; +using Weasel.SqlServer; using Weasel.SqlServer.Tables; using Wolverine; using Wolverine.ComplianceTests; using Wolverine.EntityFrameworkCore.Internals; +using Wolverine.RDBMS; using Wolverine.Runtime; using Wolverine.Tracking; using Wolverine.Transports; @@ -49,6 +53,18 @@ public async Task happy_path_eager_idempotency() persisted.Destination.ShouldBe(envelope.Destination); persisted.MessageType.ShouldBe(envelope.MessageType); persisted.Status.ShouldBe(EnvelopeStatus.Handled); + persisted.KeepUntil.HasValue.ShouldBeTrue(); + + using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + + var raw = await conn + .CreateCommand($"select keep_until from idempotency.{DatabaseConstants.IncomingTable} where id = @id") + .With("id", persisted.Id) + .ExecuteScalarAsync(); + + raw.ShouldNotBeNull(); + raw.ShouldBeOfType().ShouldBeGreaterThan(DateTimeOffset.UtcNow); } diff --git a/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs b/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs index 54d4f3dbf..77f9af6f6 100644 --- a/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs +++ b/src/Persistence/EfCoreTests/using_add_dbcontext_with_wolverine_integration.cs @@ -16,6 +16,7 @@ using Wolverine.EntityFrameworkCore; using Wolverine.EntityFrameworkCore.Internals; using Wolverine.Persistence; +using Wolverine.RDBMS; using Wolverine.Runtime; using Wolverine.SqlServer; using Wolverine.Tracking; @@ -78,14 +79,23 @@ public async Task happy_path_eager_idempotency() await dbContext.Database.CurrentTransaction!.CommitAsync(); - var all = await runtime.Storage.Admin.AllIncomingAsync(); - var persisted = (await runtime.Storage.Admin.AllIncomingAsync()).Single(x => x.Id == envelope.Id); persisted.Data.Length.ShouldBe(0); persisted.Destination.ShouldBe(envelope.Destination); persisted.MessageType.ShouldBe(envelope.MessageType); persisted.Status.ShouldBe(EnvelopeStatus.Handled); - + persisted.KeepUntil.HasValue.ShouldBeTrue(); + + using var conn = new SqlConnection(Servers.SqlServerConnectionString); + await conn.OpenAsync(); + + var raw = await conn + .CreateCommand($"select keep_until from idempotency.{DatabaseConstants.IncomingTable} where id = @id") + .With("id", persisted.Id) + .ExecuteScalarAsync(); + + raw.ShouldNotBeNull(); + raw.ShouldBeOfType().ShouldBeGreaterThan(DateTimeOffset.UtcNow); } [Fact] diff --git a/src/Persistence/MartenTests/idempotency_check_in_marten_envelope_transaction.cs b/src/Persistence/MartenTests/idempotency_check_in_marten_envelope_transaction.cs index 3751544b6..7f1d31aa0 100644 --- a/src/Persistence/MartenTests/idempotency_check_in_marten_envelope_transaction.cs +++ b/src/Persistence/MartenTests/idempotency_check_in_marten_envelope_transaction.cs @@ -61,6 +61,7 @@ public async Task happy_path_eager_idempotency() persisted.Destination.ShouldBe(envelope.Destination); persisted.MessageType.ShouldBe(envelope.MessageType); persisted.Status.ShouldBe(EnvelopeStatus.Handled); + persisted.KeepUntil.HasValue.ShouldBeTrue(); } diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/IncomingMessage.cs b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/IncomingMessage.cs index ffdb88364..33db5188f 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/Internals/IncomingMessage.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/Internals/IncomingMessage.cs @@ -15,11 +15,14 @@ public IncomingMessage(Envelope envelope) OwnerId = envelope.OwnerId; ExecutionTime = envelope.ScheduledTime?.ToUniversalTime(); Attempts = envelope.Attempts; - Body = EnvelopeSerializer.Serialize(envelope); + Body = envelope.Status == EnvelopeStatus.Handled ? [] : EnvelopeSerializer.Serialize(envelope); MessageType = envelope.MessageType!; ReceivedAt = envelope.Destination?.ToString(); + KeepUntil = envelope.KeepUntil; } + public DateTimeOffset? KeepUntil { get; set; } + public Guid Id { get; set; } public string Status { get; set; } = EnvelopeStatus.Incoming.ToString(); public int OwnerId { get; set; } diff --git a/src/Persistence/Wolverine.EntityFrameworkCore/WolverineEntityCoreExtensions.cs b/src/Persistence/Wolverine.EntityFrameworkCore/WolverineEntityCoreExtensions.cs index 6809a27f9..4d50e63f0 100644 --- a/src/Persistence/Wolverine.EntityFrameworkCore/WolverineEntityCoreExtensions.cs +++ b/src/Persistence/Wolverine.EntityFrameworkCore/WolverineEntityCoreExtensions.cs @@ -230,6 +230,7 @@ public static ModelBuilder MapWolverineEnvelopeStorage(this ModelBuilder modelBu eb.Property(x => x.Body).HasColumnName(DatabaseConstants.Body).IsRequired(); eb.Property(x => x.MessageType).HasColumnName(DatabaseConstants.MessageType).IsRequired(); eb.Property(x => x.ReceivedAt).HasColumnName(DatabaseConstants.ReceivedAt); + eb.Property(x => x.KeepUntil).HasColumnName(DatabaseConstants.KeepUntil); }); modelBuilder.Entity(eb => diff --git a/src/Persistence/Wolverine.Marten/Persistence/Operations/StoreIncomingEnvelope.cs b/src/Persistence/Wolverine.Marten/Persistence/Operations/StoreIncomingEnvelope.cs index 79f72b4c0..9b8380adb 100644 --- a/src/Persistence/Wolverine.Marten/Persistence/Operations/StoreIncomingEnvelope.cs +++ b/src/Persistence/Wolverine.Marten/Persistence/Operations/StoreIncomingEnvelope.cs @@ -39,6 +39,8 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session) builder.AppendParameter(Envelope.MessageType); builder.Append(','); builder.AppendParameter(Envelope.Destination?.ToString()); + builder.Append(','); + builder.AppendParameter(Envelope.KeepUntil.HasValue ? Envelope.KeepUntil.Value : DBNull.Value); builder.Append(");"); } diff --git a/src/Persistence/Wolverine.RDBMS/DatabaseConstants.cs b/src/Persistence/Wolverine.RDBMS/DatabaseConstants.cs index 316424028..7cc642d98 100644 --- a/src/Persistence/Wolverine.RDBMS/DatabaseConstants.cs +++ b/src/Persistence/Wolverine.RDBMS/DatabaseConstants.cs @@ -42,7 +42,7 @@ public class DatabaseConstants public const string Expires = "expires"; public static readonly string IncomingFields = - $"{Body}, {Id}, {Status}, {OwnerId}, {ExecutionTime}, {Attempts}, {MessageType}, {ReceivedAt}"; + $"{Body}, {Id}, {Status}, {OwnerId}, {ExecutionTime}, {Attempts}, {MessageType}, {ReceivedAt}, {KeepUntil}"; public static readonly string OutgoingFields = $"{Body}, {Id}, {OwnerId}, {Destination}, {DeliverBy}, {Attempts}, {MessageType}"; diff --git a/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs b/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs index 5c18aeade..291ef4725 100644 --- a/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs +++ b/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs @@ -79,7 +79,8 @@ public static void BuildIncomingStorageCommand(IMessageDatabase settings, DbComm builder.AddParameter(envelope.ScheduledTime), builder.AddParameter(envelope.Attempts), builder.AddParameter(envelope.MessageType), - builder.AddParameter(envelope.Destination?.ToString()) + builder.AddParameter(envelope.Destination?.ToString()), + builder.AddParameter(envelope.KeepUntil) }; var parameterList = list.Select(x => $"@{x.ParameterName}").Join(", "); @@ -107,6 +108,11 @@ public static async Task ReadIncomingAsync(DbDataReader reader, Cancel envelope.Attempts = await reader.GetFieldValueAsync(5, cancellation); + if (!await reader.IsDBNullAsync(8, cancellation)) + { + envelope.KeepUntil = await reader.GetFieldValueAsync(8, cancellation); + } + return envelope; }