Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
using IntegrationTests;
using Microsoft.Data.SqlClient;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NSubstitute;
using SharedPersistenceModels.Items;
using Shouldly;
using Weasel.SqlServer;
using Weasel.SqlServer.Tables;
using Wolverine;
using Wolverine.ComplianceTests;
using Wolverine.EntityFrameworkCore.Internals;
using Wolverine.RDBMS;
using Wolverine.Runtime;
using Wolverine.Tracking;
using Wolverine.Transports;
Expand Down Expand Up @@ -39,7 +43,7 @@ public async Task happy_path_eager_idempotency()

var transaction = new EfCoreEnvelopeTransaction(dbContext, context);

var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None);
ok.ShouldBeTrue();

await dbContext.Database.CurrentTransaction!.CommitAsync();
Expand All @@ -49,6 +53,18 @@ public async Task happy_path_eager_idempotency()
persisted.Destination.ShouldBe(envelope.Destination);
persisted.MessageType.ShouldBe(envelope.MessageType);
persisted.Status.ShouldBe(EnvelopeStatus.Handled);
persisted.KeepUntil.HasValue.ShouldBeTrue();

using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();

var raw = await conn
.CreateCommand($"select keep_until from idempotency.{DatabaseConstants.IncomingTable} where id = @id")
.With("id", persisted.Id)
.ExecuteScalarAsync();

raw.ShouldNotBeNull();
raw.ShouldBeOfType<DateTimeOffset>().ShouldBeGreaterThan(DateTimeOffset.UtcNow);

}

Expand All @@ -66,14 +82,15 @@ public async Task sad_path_eager_idempotency()

var transaction = new EfCoreEnvelopeTransaction(dbContext, context);

var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var durabilitySettings = new DurabilitySettings();
var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, durabilitySettings, CancellationToken.None);
ok.ShouldBeTrue();
await dbContext.Database.CurrentTransaction!.CommitAsync();

// Kind of resetting it here
envelope.WasPersistedInInbox = false;

var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, durabilitySettings, CancellationToken.None);
secondTime.ShouldBeFalse();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Wolverine.EntityFrameworkCore;
using Wolverine.EntityFrameworkCore.Internals;
using Wolverine.Persistence;
using Wolverine.RDBMS;
using Wolverine.Runtime;
using Wolverine.SqlServer;
using Wolverine.Tracking;
Expand Down Expand Up @@ -73,19 +74,28 @@ public async Task happy_path_eager_idempotency()

var transaction = new EfCoreEnvelopeTransaction(dbContext, context);

var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None);
ok.ShouldBeTrue();

await dbContext.Database.CurrentTransaction!.CommitAsync();

var all = await runtime.Storage.Admin.AllIncomingAsync();

var persisted = (await runtime.Storage.Admin.AllIncomingAsync()).Single(x => x.Id == envelope.Id);
persisted.Data.Length.ShouldBe(0);
persisted.Destination.ShouldBe(envelope.Destination);
persisted.MessageType.ShouldBe(envelope.MessageType);
persisted.Status.ShouldBe(EnvelopeStatus.Handled);

persisted.KeepUntil.HasValue.ShouldBeTrue();

using var conn = new SqlConnection(Servers.SqlServerConnectionString);
await conn.OpenAsync();

var raw = await conn
.CreateCommand($"select keep_until from idempotency.{DatabaseConstants.IncomingTable} where id = @id")
.With("id", persisted.Id)
.ExecuteScalarAsync();

raw.ShouldNotBeNull();
raw.ShouldBeOfType<DateTimeOffset>().ShouldBeGreaterThan(DateTimeOffset.UtcNow);
}

[Fact]
Expand All @@ -102,14 +112,15 @@ public async Task sad_path_eager_idempotency()

var transaction = new EfCoreEnvelopeTransaction(dbContext, context);

var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var durabilitySettings = new DurabilitySettings();
var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, durabilitySettings, CancellationToken.None);
ok.ShouldBeTrue();
await dbContext.Database.CurrentTransaction!.CommitAsync();

// Kind of resetting it here
envelope.WasPersistedInInbox = false;

var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, durabilitySettings, CancellationToken.None);
secondTime.ShouldBeFalse();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ public async Task happy_path_eager_idempotency()
using var session = _host.DocumentStore().LightweightSession();
var transaction = new MartenEnvelopeTransaction(session, context);

var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None);
ok.ShouldBeTrue();

var persisted = (await runtime.Storage.Admin.AllIncomingAsync()).Single(x => x.Id == envelope.Id);
persisted.Data.Length.ShouldBe(0);
persisted.Destination.ShouldBe(envelope.Destination);
persisted.MessageType.ShouldBe(envelope.MessageType);
persisted.Status.ShouldBe(EnvelopeStatus.Handled);
persisted.KeepUntil.HasValue.ShouldBeTrue();

}

Expand All @@ -77,16 +78,14 @@ public async Task sad_path_eager_idempotency()
using var session = _host.DocumentStore().LightweightSession();
var transaction = new MartenEnvelopeTransaction(session, context);

var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var ok = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None);
ok.ShouldBeTrue();

// Kind of resetting it here
envelope.WasPersistedInInbox = false;

var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, CancellationToken.None);
var secondTime = await transaction.TryMakeEagerIdempotencyCheckAsync(envelope, new DurabilitySettings(), CancellationToken.None);
secondTime.ShouldBeFalse();


}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class EfCoreEnvelopeTransaction : IEnvelopeTransaction
{
private readonly MessageContext _messaging;
private readonly IMessageDatabase _database;

public EfCoreEnvelopeTransaction(DbContext dbContext, MessageContext messaging)
{
_messaging = messaging;
Expand Down Expand Up @@ -125,7 +125,8 @@ public ValueTask RollbackAsync()
return ValueTask.CompletedTask;
}

public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation)
public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings,
CancellationToken cancellation)
{
if (envelope.WasPersistedInInbox) return true;

Expand All @@ -136,7 +137,7 @@ public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, Can

try
{
var copy = Envelope.ForPersistedHandled(envelope);
var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings);
await PersistIncomingAsync(copy);

// Gotta flush the call to the database!
Expand Down Expand Up @@ -181,7 +182,7 @@ public async ValueTask CommitAsync(CancellationToken cancellation)
// handled messages for the sake of idempotency
else
{
var envelope = Envelope.ForPersistedHandled(_messaging.Envelope);
var envelope = Envelope.ForPersistedHandled(_messaging.Envelope, DateTimeOffset.UtcNow, _messaging.Runtime.Options.Durability);
await PersistIncomingAsync(envelope);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ public IncomingMessage(Envelope envelope)
OwnerId = envelope.OwnerId;
ExecutionTime = envelope.ScheduledTime?.ToUniversalTime();
Attempts = envelope.Attempts;
Body = EnvelopeSerializer.Serialize(envelope);
Body = envelope.Status == EnvelopeStatus.Handled ? [] : EnvelopeSerializer.Serialize(envelope);
MessageType = envelope.MessageType!;
ReceivedAt = envelope.Destination?.ToString();
KeepUntil = envelope.KeepUntil;
}

public DateTimeOffset? KeepUntil { get; set; }

public Guid Id { get; set; }
public string Status { get; set; } = EnvelopeStatus.Incoming.ToString();
public int OwnerId { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ public static ModelBuilder MapWolverineEnvelopeStorage(this ModelBuilder modelBu
eb.Property(x => x.Body).HasColumnName(DatabaseConstants.Body).IsRequired();
eb.Property(x => x.MessageType).HasColumnName(DatabaseConstants.MessageType).IsRequired();
eb.Property(x => x.ReceivedAt).HasColumnName(DatabaseConstants.ReceivedAt);
eb.Property(x => x.KeepUntil).HasColumnName(DatabaseConstants.KeepUntil);
});

modelBuilder.Entity<OutgoingMessage>(eb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public override Task BeforeSaveChangesAsync(IDocumentSession session, Cancellati
}
else
{
var envelope = Envelope.ForPersistedHandled(_context.Envelope);
var envelope = Envelope.ForPersistedHandled(_context.Envelope, DateTimeOffset.UtcNow, _context.Runtime.Options.Durability);
session.QueueOperation(new StoreIncomingEnvelope(_messageStore.IncomingFullName, envelope));
}

Expand Down
5 changes: 3 additions & 2 deletions src/Persistence/Wolverine.Marten/MartenEnvelopeTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,16 @@ public ValueTask RollbackAsync()
return ValueTask.CompletedTask;
}

public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation)
public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings,
CancellationToken cancellation)
{
if (envelope.WasPersistedInInbox) return true;

try
{
// Might need to reset!
_context.MultiFlushMode = MultiFlushMode.AllowMultiples;
var copy = Envelope.ForPersistedHandled(envelope);
var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings);
await PersistIncomingAsync(copy);
await Session.SaveChangesAsync(cancellation);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public void ConfigureCommand(ICommandBuilder builder, IMartenSession session)
builder.AppendParameter(Envelope.MessageType);
builder.Append(',');
builder.AppendParameter(Envelope.Destination?.ToString());
builder.Append(',');
builder.AppendParameter(Envelope.KeepUntil.HasValue ? Envelope.KeepUntil.Value : DBNull.Value);
builder.Append(");");
}

Expand Down
2 changes: 1 addition & 1 deletion src/Persistence/Wolverine.RDBMS/DatabaseConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class DatabaseConstants
public const string Expires = "expires";

public static readonly string IncomingFields =
$"{Body}, {Id}, {Status}, {OwnerId}, {ExecutionTime}, {Attempts}, {MessageType}, {ReceivedAt}";
$"{Body}, {Id}, {Status}, {OwnerId}, {ExecutionTime}, {Attempts}, {MessageType}, {ReceivedAt}, {KeepUntil}";

public static readonly string OutgoingFields =
$"{Body}, {Id}, {OwnerId}, {Destination}, {DeliverBy}, {Attempts}, {MessageType}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ public Task PersistIncomingAsync(Envelope envelope)
return _database.StoreIncomingAsync(_tx, [envelope]);
}

public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation)
public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings,
CancellationToken cancellation)
{
var copy = Envelope.ForPersistedHandled(envelope);
var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings);
try
{
await PersistIncomingAsync(copy);
Expand Down
8 changes: 7 additions & 1 deletion src/Persistence/Wolverine.RDBMS/DatabasePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public static void BuildIncomingStorageCommand(IMessageDatabase settings, DbComm
builder.AddParameter(envelope.ScheduledTime),
builder.AddParameter(envelope.Attempts),
builder.AddParameter(envelope.MessageType),
builder.AddParameter(envelope.Destination?.ToString())
builder.AddParameter(envelope.Destination?.ToString()),
builder.AddParameter(envelope.KeepUntil)
};

var parameterList = list.Select(x => $"@{x.ParameterName}").Join(", ");
Expand Down Expand Up @@ -107,6 +108,11 @@ public static async Task<Envelope> ReadIncomingAsync(DbDataReader reader, Cancel

envelope.Attempts = await reader.GetFieldValueAsync<int>(5, cancellation);

if (!await reader.IsDBNullAsync(8, cancellation))
{
envelope.KeepUntil = await reader.GetFieldValueAsync<DateTimeOffset>(8, cancellation);
}

return envelope;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ public RavenDbEnvelopeTransaction(IAsyncDocumentSession session, MessageContext
Session = session;
}

public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, CancellationToken cancellation)
public async Task<bool> TryMakeEagerIdempotencyCheckAsync(Envelope envelope, DurabilitySettings settings,
CancellationToken cancellation)
{
var copy = Envelope.ForPersistedHandled(envelope);
var copy = Envelope.ForPersistedHandled(envelope, DateTimeOffset.UtcNow, settings);
try
{
await PersistIncomingAsync(copy);
Expand Down
52 changes: 52 additions & 0 deletions src/Testing/CoreTests/EnvelopeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -569,4 +569,56 @@ public void the_content_type_should_be_binary_envelope()
theScheduledEnvelope.ContentType.ShouldBe(TransportConstants.SerializedEnvelope);
}
}

public class when_building_an_envelope_for_persisted_handled
{
private readonly Envelope theOriginal;
private readonly DateTimeOffset now;
private readonly DurabilitySettings theSettings;
private readonly Envelope theHandledEnvelope;

public when_building_an_envelope_for_persisted_handled()
{
theOriginal = ObjectMother.Envelope();
theOriginal.Status = EnvelopeStatus.Incoming;

now = DateTime.Today.ToUniversalTime();
theSettings = new DurabilitySettings
{
KeepAfterMessageHandling = 5.Minutes()
};

theHandledEnvelope = Envelope.ForPersistedHandled(theOriginal, now, theSettings);
}

[Fact]
public void status_should_be_handled()
{
theHandledEnvelope.Status.ShouldBe(EnvelopeStatus.Handled);
}

[Fact]
public void keep_until_should_be_set()
{
theHandledEnvelope.KeepUntil.Value.ShouldBe(now.AddMinutes(5));
}

[Fact]
public void owner_is_any_node()
{
theHandledEnvelope.OwnerId.ShouldBe(0);
}

[Fact]
public void destination()
{
theHandledEnvelope.Destination.ShouldBe(theOriginal.Destination);
}

[Fact]
public void message_type()
{
theHandledEnvelope.MessageType.ShouldBe(theOriginal.MessageType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public serialization_and_deserialization_of_single_message()
DeliverBy = DateTime.Today.ToUniversalTime(),
ReplyUri = "tcp://localhost:2221/replies".ToUri(),
SagaId = Guid.NewGuid().ToString(),
ParentId = Guid.NewGuid().ToString()
ParentId = Guid.NewGuid().ToString(),
KeepUntil = DateTime.Today.AddDays(1).ToUniversalTime()
};

outgoing.Headers.Add("name", "Jeremy");
Expand Down Expand Up @@ -222,4 +223,10 @@ public void partition_key()
outgoing.PartitionKey = Guid.NewGuid().ToString();
incoming.PartitionKey.ShouldBe(outgoing.PartitionKey);
}

[Fact]
public void keep_until()
{
incoming.KeepUntil.Value.ShouldBe(outgoing.KeepUntil.Value);
}
}
Loading
Loading