Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions mise.toml

This file was deleted.

3 changes: 2 additions & 1 deletion src/Persistence/MySql/Wolverine.MySql/MySqlMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ protected override bool isExceptionFromDuplicateEnvelope(Exception ex)
{
if (ex is MySqlException mySqlException)
{
return mySqlException.Number == 1062;
if (mySqlException.Number == 1062) return true;
return mySqlException.Message.Contains("Duplicate entry");
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public async Task StoreIncomingAsync(IReadOnlyList<Envelope> envelopes)
await using var conn = await _dataSource.OpenConnectionAsync(_cancellation);
var tx = (OracleTransaction)await conn.BeginTransactionAsync(_cancellation);

var duplicates = new List<Envelope>();

foreach (var envelope in envelopes)
{
var data = envelope.Status == EnvelopeStatus.Handled
Expand Down Expand Up @@ -81,12 +83,17 @@ public async Task StoreIncomingAsync(IReadOnlyList<Envelope> envelopes)
}
catch (OracleException e) when (e.Number == 1)
{
// Idempotent
duplicates.Add(envelope);
}
}

await tx.CommitAsync(_cancellation);
await conn.CloseAsync();

if (duplicates.Count > 0)
{
throw new DuplicateIncomingEnvelopeException(duplicates);
}
}

public async Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
Expand Down Expand Up @@ -311,6 +318,8 @@ public async Task ReassignIncomingAsync(int ownerId, IReadOnlyList<Envelope> inc
// IMessageDatabase
public async Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes)
{
var duplicates = new List<Envelope>();

foreach (var envelope in envelopes)
{
var data = envelope.Status == EnvelopeStatus.Handled
Expand Down Expand Up @@ -338,8 +347,13 @@ public async Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes)
}
catch (OracleException e) when (e.Number == 1)
{
// Idempotent
duplicates.Add(envelope);
}
}

if (duplicates.Count > 0)
{
throw new DuplicateIncomingEnvelopeException(duplicates);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ protected override bool isExceptionFromDuplicateEnvelope(Exception ex)
{
if (ex is PostgresException postgresException)
{
return
postgresException.Message.Contains("duplicate key value violates unique constraint");
if (postgresException.SqlState == "23505") return true;
return postgresException.Message.Contains("duplicate key value violates unique constraint");
}

return false;
Expand Down
80 changes: 72 additions & 8 deletions src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,21 @@ public Task ReassignIncomingAsync(int ownerId, IReadOnlyList<Envelope> incoming)
return executeCommandBatch(builder, _cancellation);
}

public Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes)
public async Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes)
{
var cmd = DatabasePersistence.BuildIncomingStorageCommand(envelopes, this);

cmd.Transaction = tx;
cmd.Connection = tx.Connection;

return cmd.ExecuteNonQueryAsync(_cancellation);
try
{
await cmd.ExecuteNonQueryAsync(_cancellation);
}
catch (Exception e) when (IsDuplicateEnvelopeException(e))
{
throw new DuplicateIncomingEnvelopeException(envelopes);
}
}

public async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception)
Expand All @@ -68,7 +75,7 @@ public async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exc
}
catch (Exception e)
{
if (isExceptionFromDuplicateEnvelope(e)) return;
if (IsDuplicateEnvelopeException(e)) return;
throw;
}
}
Expand Down Expand Up @@ -155,7 +162,7 @@ public async Task StoreIncomingAsync(Envelope envelope)
}
catch (Exception e)
{
if (isExceptionFromDuplicateEnvelope(e))
if (IsDuplicateEnvelopeException(e))
{
throw new DuplicateIncomingEnvelopeException(envelope);
}
Expand All @@ -166,21 +173,78 @@ public async Task StoreIncomingAsync(Envelope envelope)

public async Task StoreIncomingAsync(IReadOnlyList<Envelope> envelopes)
{
if (envelopes.Count == 0) return;

var cmd = DatabasePersistence.BuildIncomingStorageCommand(envelopes, this);

await using var conn = await _dataSource.OpenConnectionAsync(_cancellation);
try
{

cmd.Connection = conn;

await cmd.ExecuteNonQueryAsync(_cancellation);
// Wrap the multi-statement batch in an explicit transaction so the
// semantics are uniform across drivers: SqlClient/MySqlConnector/
// Microsoft.Data.Sqlite autocommit per statement otherwise, which
// would partially persist the batch on a duplicate-key failure and
// leave the inbox in a state that is indistinguishable from
// "envelope was already there". Npgsql already does this implicitly,
// but being explicit costs nothing and removes a per-driver footgun.
await using var tx = await conn.BeginTransactionAsync(_cancellation);
try
{
cmd.Connection = conn;
cmd.Transaction = tx;
await cmd.ExecuteNonQueryAsync(_cancellation);
await tx.CommitAsync(_cancellation);
}
catch (Exception e) when (IsDuplicateEnvelopeException(e))
{
await tx.RollbackAsync(_cancellation);

// Now that the batch is guaranteed rolled back, identify exactly
// which envelopes were already present via id-existence. Callers
// can retry the rest per-envelope.
var duplicates = new List<Envelope>();
foreach (var envelope in envelopes)
{
if (await ExistsAsync(envelope, _cancellation).ConfigureAwait(false))
{
duplicates.Add(envelope);
}
}

if (duplicates.Count == 0)
{
// Backend reported a duplicate-key error but no envelope id
// matches an existing row. Surface the original failure
// rather than silently swallowing it.
throw;
}

throw new DuplicateIncomingEnvelopeException(duplicates);
}
}
finally
{
await conn.CloseAsync();
}
}

protected bool IsDuplicateEnvelopeException(Exception ex)
{
for (var current = ex; current != null; current = current.InnerException)
{
if (isExceptionFromDuplicateEnvelope(current)) return true;
}

if (ex is AggregateException agg)
{
foreach (var inner in agg.InnerExceptions)
{
if (IsDuplicateEnvelopeException(inner)) return true;
}
}

return false;
}

protected abstract bool isExceptionFromDuplicateEnvelope(Exception ex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,14 @@ public async Task<IReadOnlyList<DbObjectName>> SchemaTables(CancellationToken ct

protected override bool isExceptionFromDuplicateEnvelope(Exception ex)
{
return ex is SqlException sqlEx && sqlEx.Message.ContainsIgnoreCase("Violation of PRIMARY KEY constraint");
if (ex is SqlException sqlEx)
{
if (sqlEx.Number == 2627 || sqlEx.Number == 2601) return true;
return sqlEx.Message.ContainsIgnoreCase("Violation of PRIMARY KEY constraint")
|| sqlEx.Message.ContainsIgnoreCase("Violation of UNIQUE KEY constraint");
}

return false;
}

protected override void writePagingAfter(DbCommandBuilder builder, int offset, int limit)
Expand Down
13 changes: 11 additions & 2 deletions src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,17 @@ protected override bool isExceptionFromDuplicateEnvelope(Exception ex)
{
if (ex is SqliteException sqliteException)
{
return sqliteException.SqliteErrorCode == 19 || // SQLITE_CONSTRAINT
sqliteException.Message.Contains("UNIQUE constraint failed");
// SQLITE_CONSTRAINT_PRIMARYKEY (1555) or SQLITE_CONSTRAINT_UNIQUE (2067)
if (sqliteException.SqliteExtendedErrorCode == 1555
|| sqliteException.SqliteExtendedErrorCode == 2067)
{
return true;
}

// Fallback: SQLITE_CONSTRAINT (19) plus message-match
return sqliteException.SqliteErrorCode == 19
&& (sqliteException.Message.Contains("UNIQUE constraint failed")
|| sqliteException.Message.Contains("PRIMARY KEY"));
}

return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
using NSubstitute;
using NSubstitute.ExceptionExtensions;
using Wolverine.ComplianceTests;
using Wolverine.Persistence.Durability;
using Wolverine.Runtime;
using Wolverine.Runtime.WorkerQueues;
using Wolverine.Transports;
using Wolverine.Transports.Stub;
using Xunit;

namespace CoreTests.Runtime.WorkerQueues;

public class when_durable_receiver_detects_duplicate_incoming_envelopes_in_batch : IAsyncLifetime
{
private readonly Envelope theDuplicate = ObjectMother.Envelope();
private readonly Envelope theFreshEnvelope = ObjectMother.Envelope();
private readonly IListener theListener = Substitute.For<IListener>();
private readonly IHandlerPipeline thePipeline = Substitute.For<IHandlerPipeline>();
private readonly DurableReceiver theReceiver;
private readonly MockWolverineRuntime theRuntime;

public when_durable_receiver_detects_duplicate_incoming_envelopes_in_batch()
{
theRuntime = new MockWolverineRuntime();
var stubEndpoint = new StubEndpoint("one", new StubTransport());
theReceiver = new DurableReceiver(stubEndpoint, theRuntime, thePipeline);

// The batch insert fails with a typed duplicate exception. DurableReceiver
// re-posts every envelope through the per-envelope path, where the single
// StoreIncomingAsync correctly distinguishes the actual duplicate from the
// fresh one.
theRuntime.Storage.Inbox
.StoreIncomingAsync(Arg.Any<IReadOnlyList<Envelope>>())
.Throws(new DuplicateIncomingEnvelopeException(new[] { theDuplicate }));

theRuntime.Storage.Inbox
.StoreIncomingAsync(theDuplicate)
.Throws(new DuplicateIncomingEnvelopeException(theDuplicate));

theRuntime.Storage.Inbox
.StoreIncomingAsync(theFreshEnvelope)
.Returns(Task.CompletedTask);
}

public async Task InitializeAsync()
{
var now = DateTimeOffset.UtcNow;
await theReceiver.ProcessReceivedMessagesAsync(now, theListener, new[] { theDuplicate, theFreshEnvelope });
await theReceiver.DrainAsync();
}

public Task DisposeAsync() => Task.CompletedTask;

[Fact]
public async Task the_duplicate_listener_was_completed()
{
await theListener.Received().CompleteAsync(theDuplicate);
}

[Fact]
public async Task the_duplicate_envelope_was_not_processed()
{
await thePipeline.DidNotReceive().InvokeAsync(theDuplicate, theReceiver);
}

[Fact]
public async Task the_fresh_envelope_was_re_attempted_through_the_per_envelope_path()
{
// The per-envelope StoreIncomingAsync is the deduplication checkpoint:
// a duplicate throws and is completed at the listener; a fresh envelope
// succeeds and the receiver enqueues it for the pipeline. Asserting at
// the inbox layer verifies the fall-back contract directly, without
// depending on the receiver's downstream Dataflow drain ordering.
await theRuntime.Storage.Inbox.Received().StoreIncomingAsync(theFreshEnvelope);
}
}
29 changes: 29 additions & 0 deletions src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,35 @@ await Should.ThrowAsync<DuplicateIncomingEnvelopeException>(async () =>
});
}

[Fact]
public async Task bulk_store_intra_batch_duplicate_reports_only_actual_duplicates()
{
var existing = ObjectMother.Envelope();
existing.Destination = new Uri("stub://incoming-bulk-dup");
existing.Status = EnvelopeStatus.Incoming;
await thePersistence.Inbox.StoreIncomingAsync(existing);

var fresh1 = ObjectMother.Envelope();
fresh1.Destination = existing.Destination;
fresh1.Status = EnvelopeStatus.Incoming;

var fresh2 = ObjectMother.Envelope();
fresh2.Destination = existing.Destination;
fresh2.Status = EnvelopeStatus.Incoming;

var batch = new[] { fresh1, existing, fresh2 };

var ex = await Should.ThrowAsync<DuplicateIncomingEnvelopeException>(
() => thePersistence.Inbox.StoreIncomingAsync(batch));

// Only the actually-existing envelope is reported as a duplicate.
// Fresh envelopes must NOT appear in Duplicates — otherwise DurableReceiver
// would route them straight to listener.CompleteAsync and the handler would
// never run for legitimate messages.
ex.Duplicates.Count.ShouldBe(1);
ex.Duplicates.Single().Id.ShouldBe(existing.Id);
}

[Fact]
public async Task store_a_single_outgoing_envelope()
{
Expand Down
Loading
Loading