diff --git a/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs b/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs index dbbf2170d..5c18aeade 100644 --- a/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs +++ b/src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Logging; using Weasel.Core; using Wolverine.Persistence.Durability; +using Wolverine.Runtime; using Wolverine.Runtime.Serialization; using DbCommandBuilder = Weasel.Core.DbCommandBuilder; @@ -66,9 +67,12 @@ public static DbCommand BuildIncomingStorageCommand(IEnumerable envelo public static void BuildIncomingStorageCommand(IMessageDatabase settings, DbCommandBuilder builder, Envelope envelope) { + // Don't store any data if the envelope is already marked as handled + var data = envelope.Status == EnvelopeStatus.Handled ? [] : EnvelopeSerializer.Serialize(envelope); + var list = new List { - builder.AddParameter(EnvelopeSerializer.Serialize(envelope)), + builder.AddParameter(data), builder.AddParameter(envelope.Id), builder.AddParameter(envelope.Status.ToString()), builder.AddParameter(envelope.OwnerId), @@ -87,9 +91,14 @@ public static void BuildIncomingStorageCommand(IMessageDatabase settings, DbComm public static async Task ReadIncomingAsync(DbDataReader reader, CancellationToken cancellation = default) { var body = await reader.GetFieldValueAsync(0, cancellation); - var envelope = EnvelopeSerializer.Deserialize(body); + var envelope = body.Length > 0 ? EnvelopeSerializer.Deserialize(body) : new Envelope{Message = new PlaceHolder()}; + envelope.Id = await reader.GetFieldValueAsync(1, cancellation); envelope.Status = Enum.Parse(await reader.GetFieldValueAsync(2, cancellation)); envelope.OwnerId = await reader.GetFieldValueAsync(3, cancellation); + envelope.MessageType = await reader.GetFieldValueAsync(6, cancellation); + + var rawUri = await reader.GetFieldValueAsync(7, cancellation); + envelope.Destination = new Uri(rawUri); if (!await reader.IsDBNullAsync(4, cancellation)) { diff --git a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs index 8180aefc3..532a2319a 100644 --- a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs @@ -105,6 +105,28 @@ public async Task store_a_single_incoming_envelope() stored.SentAt.ShouldBe(envelope.SentAt); } + + [Fact] + public async Task store_a_single_incoming_envelope_that_is_handled() + { + // This is for cases where you're only persisting the record for idempotency checks + + var envelope = ObjectMother.Envelope(); + envelope.Status = EnvelopeStatus.Handled; + envelope.SentAt = ((DateTimeOffset)DateTime.Today).ToUniversalTime(); + + await thePersistence.Inbox.StoreIncomingAsync(envelope); + + var stored = (await thePersistence.Admin.AllIncomingAsync()).Single(); + + // This is the important part + stored.Data.Length.ShouldBe(0); + stored.Destination.ShouldBe(envelope.Destination); + + stored.Id.ShouldBe(envelope.Id); + stored.OwnerId.ShouldBe(envelope.OwnerId); + stored.Status.ShouldBe(envelope.Status); + } [Fact] public async Task store_a_single_incoming_envelope_that_is_a_duplicate() diff --git a/src/Wolverine/Runtime/PlaceHolder.cs b/src/Wolverine/Runtime/PlaceHolder.cs new file mode 100644 index 000000000..4c0fcce32 --- /dev/null +++ b/src/Wolverine/Runtime/PlaceHolder.cs @@ -0,0 +1,17 @@ +namespace Wolverine.Runtime; + +/// +/// Just a place holder for the real message +/// +public class PlaceHolder : ISerializable +{ + public byte[] Write() + { + return []; + } + + public static object Read(byte[] bytes) + { + return new PlaceHolder(); + } +} \ No newline at end of file