Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Logging;
using Weasel.Core;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Runtime.Serialization;
using DbCommandBuilder = Weasel.Core.DbCommandBuilder;

Expand Down Expand Up @@ -66,9 +67,12 @@ public static DbCommand BuildIncomingStorageCommand(IEnumerable<Envelope> envelo
public static void BuildIncomingStorageCommand(IMessageDatabase settings, DbCommandBuilder builder,
Envelope envelope)
{
// Don't store any data if the envelope is already marked as handled
var data = envelope.Status == EnvelopeStatus.Handled ? [] : EnvelopeSerializer.Serialize(envelope);

var list = new List<DbParameter>
{
builder.AddParameter(EnvelopeSerializer.Serialize(envelope)),
builder.AddParameter(data),
builder.AddParameter(envelope.Id),
builder.AddParameter(envelope.Status.ToString()),
builder.AddParameter(envelope.OwnerId),
Expand All @@ -87,9 +91,14 @@ public static void BuildIncomingStorageCommand(IMessageDatabase settings, DbComm
public static async Task<Envelope> ReadIncomingAsync(DbDataReader reader, CancellationToken cancellation = default)
{
var body = await reader.GetFieldValueAsync<byte[]>(0, cancellation);
var envelope = EnvelopeSerializer.Deserialize(body);
var envelope = body.Length > 0 ? EnvelopeSerializer.Deserialize(body) : new Envelope{Message = new PlaceHolder()};
envelope.Id = await reader.GetFieldValueAsync<Guid>(1, cancellation);
envelope.Status = Enum.Parse<EnvelopeStatus>(await reader.GetFieldValueAsync<string>(2, cancellation));
envelope.OwnerId = await reader.GetFieldValueAsync<int>(3, cancellation);
envelope.MessageType = await reader.GetFieldValueAsync<string>(6, cancellation);

var rawUri = await reader.GetFieldValueAsync<string>(7, cancellation);
envelope.Destination = new Uri(rawUri);

if (!await reader.IsDBNullAsync(4, cancellation))
{
Expand Down
22 changes: 22 additions & 0 deletions src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,28 @@ public async Task store_a_single_incoming_envelope()

stored.SentAt.ShouldBe(envelope.SentAt);
}

[Fact]
public async Task store_a_single_incoming_envelope_that_is_handled()
{
// This is for cases where you're only persisting the record for idempotency checks

var envelope = ObjectMother.Envelope();
envelope.Status = EnvelopeStatus.Handled;
envelope.SentAt = ((DateTimeOffset)DateTime.Today).ToUniversalTime();

await thePersistence.Inbox.StoreIncomingAsync(envelope);

var stored = (await thePersistence.Admin.AllIncomingAsync()).Single();

// This is the important part
stored.Data.Length.ShouldBe(0);
stored.Destination.ShouldBe(envelope.Destination);

stored.Id.ShouldBe(envelope.Id);
stored.OwnerId.ShouldBe(envelope.OwnerId);
stored.Status.ShouldBe(envelope.Status);
}

[Fact]
public async Task store_a_single_incoming_envelope_that_is_a_duplicate()
Expand Down
17 changes: 17 additions & 0 deletions src/Wolverine/Runtime/PlaceHolder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace Wolverine.Runtime;

/// <summary>
/// Just a place holder for the real message
/// </summary>
public class PlaceHolder : ISerializable
{
public byte[] Write()
{
return [];
}

public static object Read(byte[] bytes)
{
return new PlaceHolder();
}
}
Loading