diff --git a/mise.toml b/mise.toml deleted file mode 100644 index 43e1ed2e3..000000000 --- a/mise.toml +++ /dev/null @@ -1,2 +0,0 @@ -[tools] -dotnet = "9.0" diff --git a/src/Persistence/MySql/Wolverine.MySql/MySqlMessageStore.cs b/src/Persistence/MySql/Wolverine.MySql/MySqlMessageStore.cs index fb7e6d296..7db59581f 100644 --- a/src/Persistence/MySql/Wolverine.MySql/MySqlMessageStore.cs +++ b/src/Persistence/MySql/Wolverine.MySql/MySqlMessageStore.cs @@ -74,7 +74,8 @@ protected override bool isExceptionFromDuplicateEnvelope(Exception ex) { if (ex is MySqlException mySqlException) { - return mySqlException.Number == 1062; + if (mySqlException.Number == 1062) return true; + return mySqlException.Message.Contains("Duplicate entry"); } return false; diff --git a/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.Incoming.cs b/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.Incoming.cs index 77dea0f4f..27be224ad 100644 --- a/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.Incoming.cs +++ b/src/Persistence/Oracle/Wolverine.Oracle/OracleMessageStore.Incoming.cs @@ -54,6 +54,8 @@ public async Task StoreIncomingAsync(IReadOnlyList envelopes) await using var conn = await _dataSource.OpenConnectionAsync(_cancellation); var tx = (OracleTransaction)await conn.BeginTransactionAsync(_cancellation); + var duplicates = new List(); + foreach (var envelope in envelopes) { var data = envelope.Status == EnvelopeStatus.Handled @@ -81,12 +83,17 @@ public async Task StoreIncomingAsync(IReadOnlyList envelopes) } catch (OracleException e) when (e.Number == 1) { - // Idempotent + duplicates.Add(envelope); } } await tx.CommitAsync(_cancellation); await conn.CloseAsync(); + + if (duplicates.Count > 0) + { + throw new DuplicateIncomingEnvelopeException(duplicates); + } } public async Task ExistsAsync(Envelope envelope, CancellationToken cancellation) @@ -311,6 +318,8 @@ public async Task ReassignIncomingAsync(int ownerId, IReadOnlyList inc // IMessageDatabase public async Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes) { + var duplicates = new List(); + foreach (var envelope in envelopes) { var data = envelope.Status == EnvelopeStatus.Handled @@ -338,8 +347,13 @@ public async Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes) } catch (OracleException e) when (e.Number == 1) { - // Idempotent + duplicates.Add(envelope); } } + + if (duplicates.Count > 0) + { + throw new DuplicateIncomingEnvelopeException(duplicates); + } } } diff --git a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs index aed911a49..3f531854a 100644 --- a/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs +++ b/src/Persistence/Wolverine.Postgresql/PostgresqlMessageStore.cs @@ -119,8 +119,8 @@ protected override bool isExceptionFromDuplicateEnvelope(Exception ex) { if (ex is PostgresException postgresException) { - return - postgresException.Message.Contains("duplicate key value violates unique constraint"); + if (postgresException.SqlState == "23505") return true; + return postgresException.Message.Contains("duplicate key value violates unique constraint"); } return false; diff --git a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs index 89a5d279e..9d90062e6 100644 --- a/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs +++ b/src/Persistence/Wolverine.RDBMS/MessageDatabase.Incoming.cs @@ -34,14 +34,21 @@ public Task ReassignIncomingAsync(int ownerId, IReadOnlyList incoming) return executeCommandBatch(builder, _cancellation); } - public Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes) + public async Task StoreIncomingAsync(DbTransaction tx, Envelope[] envelopes) { var cmd = DatabasePersistence.BuildIncomingStorageCommand(envelopes, this); cmd.Transaction = tx; cmd.Connection = tx.Connection; - return cmd.ExecuteNonQueryAsync(_cancellation); + try + { + await cmd.ExecuteNonQueryAsync(_cancellation); + } + catch (Exception e) when (IsDuplicateEnvelopeException(e)) + { + throw new DuplicateIncomingEnvelopeException(envelopes); + } } public async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exception) @@ -68,7 +75,7 @@ public async Task MoveToDeadLetterStorageAsync(Envelope envelope, Exception? exc } catch (Exception e) { - if (isExceptionFromDuplicateEnvelope(e)) return; + if (IsDuplicateEnvelopeException(e)) return; throw; } } @@ -155,7 +162,7 @@ public async Task StoreIncomingAsync(Envelope envelope) } catch (Exception e) { - if (isExceptionFromDuplicateEnvelope(e)) + if (IsDuplicateEnvelopeException(e)) { throw new DuplicateIncomingEnvelopeException(envelope); } @@ -166,15 +173,54 @@ public async Task StoreIncomingAsync(Envelope envelope) public async Task StoreIncomingAsync(IReadOnlyList envelopes) { + if (envelopes.Count == 0) return; + var cmd = DatabasePersistence.BuildIncomingStorageCommand(envelopes, this); await using var conn = await _dataSource.OpenConnectionAsync(_cancellation); try { - - cmd.Connection = conn; - - await cmd.ExecuteNonQueryAsync(_cancellation); + // Wrap the multi-statement batch in an explicit transaction so the + // semantics are uniform across drivers: SqlClient/MySqlConnector/ + // Microsoft.Data.Sqlite autocommit per statement otherwise, which + // would partially persist the batch on a duplicate-key failure and + // leave the inbox in a state that is indistinguishable from + // "envelope was already there". Npgsql already does this implicitly, + // but being explicit costs nothing and removes a per-driver footgun. + await using var tx = await conn.BeginTransactionAsync(_cancellation); + try + { + cmd.Connection = conn; + cmd.Transaction = tx; + await cmd.ExecuteNonQueryAsync(_cancellation); + await tx.CommitAsync(_cancellation); + } + catch (Exception e) when (IsDuplicateEnvelopeException(e)) + { + await tx.RollbackAsync(_cancellation); + + // Now that the batch is guaranteed rolled back, identify exactly + // which envelopes were already present via id-existence. Callers + // can retry the rest per-envelope. + var duplicates = new List(); + foreach (var envelope in envelopes) + { + if (await ExistsAsync(envelope, _cancellation).ConfigureAwait(false)) + { + duplicates.Add(envelope); + } + } + + if (duplicates.Count == 0) + { + // Backend reported a duplicate-key error but no envelope id + // matches an existing row. Surface the original failure + // rather than silently swallowing it. + throw; + } + + throw new DuplicateIncomingEnvelopeException(duplicates); + } } finally { @@ -182,5 +228,23 @@ public async Task StoreIncomingAsync(IReadOnlyList envelopes) } } + protected bool IsDuplicateEnvelopeException(Exception ex) + { + for (var current = ex; current != null; current = current.InnerException) + { + if (isExceptionFromDuplicateEnvelope(current)) return true; + } + + if (ex is AggregateException agg) + { + foreach (var inner in agg.InnerExceptions) + { + if (IsDuplicateEnvelopeException(inner)) return true; + } + } + + return false; + } + protected abstract bool isExceptionFromDuplicateEnvelope(Exception ex); } \ No newline at end of file diff --git a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs index c81f1b910..0416e2394 100644 --- a/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs +++ b/src/Persistence/Wolverine.SqlServer/Persistence/SqlServerMessageStore.cs @@ -127,7 +127,14 @@ public async Task> SchemaTables(CancellationToken ct protected override bool isExceptionFromDuplicateEnvelope(Exception ex) { - return ex is SqlException sqlEx && sqlEx.Message.ContainsIgnoreCase("Violation of PRIMARY KEY constraint"); + if (ex is SqlException sqlEx) + { + if (sqlEx.Number == 2627 || sqlEx.Number == 2601) return true; + return sqlEx.Message.ContainsIgnoreCase("Violation of PRIMARY KEY constraint") + || sqlEx.Message.ContainsIgnoreCase("Violation of UNIQUE KEY constraint"); + } + + return false; } protected override void writePagingAfter(DbCommandBuilder builder, int offset, int limit) diff --git a/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs b/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs index bd2dfcfb0..4502f62a8 100644 --- a/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs +++ b/src/Persistence/Wolverine.Sqlite/SqliteMessageStore.cs @@ -90,8 +90,17 @@ protected override bool isExceptionFromDuplicateEnvelope(Exception ex) { if (ex is SqliteException sqliteException) { - return sqliteException.SqliteErrorCode == 19 || // SQLITE_CONSTRAINT - sqliteException.Message.Contains("UNIQUE constraint failed"); + // SQLITE_CONSTRAINT_PRIMARYKEY (1555) or SQLITE_CONSTRAINT_UNIQUE (2067) + if (sqliteException.SqliteExtendedErrorCode == 1555 + || sqliteException.SqliteExtendedErrorCode == 2067) + { + return true; + } + + // Fallback: SQLITE_CONSTRAINT (19) plus message-match + return sqliteException.SqliteErrorCode == 19 + && (sqliteException.Message.Contains("UNIQUE constraint failed") + || sqliteException.Message.Contains("PRIMARY KEY")); } return false; diff --git a/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_detects_duplicate_incoming_envelopes_in_batch.cs b/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_detects_duplicate_incoming_envelopes_in_batch.cs new file mode 100644 index 000000000..f2f4412b8 --- /dev/null +++ b/src/Testing/CoreTests/Runtime/WorkerQueues/when_durable_receiver_detects_duplicate_incoming_envelopes_in_batch.cs @@ -0,0 +1,76 @@ +using NSubstitute; +using NSubstitute.ExceptionExtensions; +using Wolverine.ComplianceTests; +using Wolverine.Persistence.Durability; +using Wolverine.Runtime; +using Wolverine.Runtime.WorkerQueues; +using Wolverine.Transports; +using Wolverine.Transports.Stub; +using Xunit; + +namespace CoreTests.Runtime.WorkerQueues; + +public class when_durable_receiver_detects_duplicate_incoming_envelopes_in_batch : IAsyncLifetime +{ + private readonly Envelope theDuplicate = ObjectMother.Envelope(); + private readonly Envelope theFreshEnvelope = ObjectMother.Envelope(); + private readonly IListener theListener = Substitute.For(); + private readonly IHandlerPipeline thePipeline = Substitute.For(); + private readonly DurableReceiver theReceiver; + private readonly MockWolverineRuntime theRuntime; + + public when_durable_receiver_detects_duplicate_incoming_envelopes_in_batch() + { + theRuntime = new MockWolverineRuntime(); + var stubEndpoint = new StubEndpoint("one", new StubTransport()); + theReceiver = new DurableReceiver(stubEndpoint, theRuntime, thePipeline); + + // The batch insert fails with a typed duplicate exception. DurableReceiver + // re-posts every envelope through the per-envelope path, where the single + // StoreIncomingAsync correctly distinguishes the actual duplicate from the + // fresh one. + theRuntime.Storage.Inbox + .StoreIncomingAsync(Arg.Any>()) + .Throws(new DuplicateIncomingEnvelopeException(new[] { theDuplicate })); + + theRuntime.Storage.Inbox + .StoreIncomingAsync(theDuplicate) + .Throws(new DuplicateIncomingEnvelopeException(theDuplicate)); + + theRuntime.Storage.Inbox + .StoreIncomingAsync(theFreshEnvelope) + .Returns(Task.CompletedTask); + } + + public async Task InitializeAsync() + { + var now = DateTimeOffset.UtcNow; + await theReceiver.ProcessReceivedMessagesAsync(now, theListener, new[] { theDuplicate, theFreshEnvelope }); + await theReceiver.DrainAsync(); + } + + public Task DisposeAsync() => Task.CompletedTask; + + [Fact] + public async Task the_duplicate_listener_was_completed() + { + await theListener.Received().CompleteAsync(theDuplicate); + } + + [Fact] + public async Task the_duplicate_envelope_was_not_processed() + { + await thePipeline.DidNotReceive().InvokeAsync(theDuplicate, theReceiver); + } + + [Fact] + public async Task the_fresh_envelope_was_re_attempted_through_the_per_envelope_path() + { + // The per-envelope StoreIncomingAsync is the deduplication checkpoint: + // a duplicate throws and is completed at the listener; a fresh envelope + // succeeds and the receiver enqueues it for the pipeline. Asserting at + // the inbox layer verifies the fall-back contract directly, without + // depending on the receiver's downstream Dataflow drain ordering. + await theRuntime.Storage.Inbox.Received().StoreIncomingAsync(theFreshEnvelope); + } +} diff --git a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs index 2dd749979..4a4cbcd4c 100644 --- a/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs +++ b/src/Testing/Wolverine.ComplianceTests/MessageStoreCompliance.cs @@ -162,6 +162,35 @@ await Should.ThrowAsync(async () => }); } + [Fact] + public async Task bulk_store_intra_batch_duplicate_reports_only_actual_duplicates() + { + var existing = ObjectMother.Envelope(); + existing.Destination = new Uri("stub://incoming-bulk-dup"); + existing.Status = EnvelopeStatus.Incoming; + await thePersistence.Inbox.StoreIncomingAsync(existing); + + var fresh1 = ObjectMother.Envelope(); + fresh1.Destination = existing.Destination; + fresh1.Status = EnvelopeStatus.Incoming; + + var fresh2 = ObjectMother.Envelope(); + fresh2.Destination = existing.Destination; + fresh2.Status = EnvelopeStatus.Incoming; + + var batch = new[] { fresh1, existing, fresh2 }; + + var ex = await Should.ThrowAsync( + () => thePersistence.Inbox.StoreIncomingAsync(batch)); + + // Only the actually-existing envelope is reported as a duplicate. + // Fresh envelopes must NOT appear in Duplicates — otherwise DurableReceiver + // would route them straight to listener.CompleteAsync and the handler would + // never run for legitimate messages. + ex.Duplicates.Count.ShouldBe(1); + ex.Duplicates.Single().Id.ShouldBe(existing.Id); + } + [Fact] public async Task store_a_single_outgoing_envelope() { diff --git a/src/Transports/Kafka/Wolverine.Kafka.Tests/duplicate_message_handling_with_postgres_inbox.cs b/src/Transports/Kafka/Wolverine.Kafka.Tests/duplicate_message_handling_with_postgres_inbox.cs new file mode 100644 index 000000000..32d873907 --- /dev/null +++ b/src/Transports/Kafka/Wolverine.Kafka.Tests/duplicate_message_handling_with_postgres_inbox.cs @@ -0,0 +1,181 @@ +using System.Text; +using System.Text.Json; +using Confluent.Kafka; +using IntegrationTests; +using JasperFx.Core; +using JasperFx.Resources; +using Microsoft.Extensions.Hosting; +using Shouldly; +using Wolverine.Kafka.Internals; +using Wolverine.Postgresql; +using Wolverine.Runtime; +using Wolverine.Tracking; +using Wolverine.Util; +using Xunit; + +namespace Wolverine.Kafka.Tests; + +/// +/// End-to-end coverage that a duplicate envelope id arriving over the real Kafka +/// transport, against a real Postgres durable inbox, is silently discarded and the +/// partition keeps flowing. This exercises the single-envelope path +/// (KafkaListener -> DurableReceiver.receiveOneAsync -> single StoreIncomingAsync), +/// which is the path Kafka actually uses today, and confirms the SqlState-based +/// duplicate detection works locale-independently against a live driver. +/// +/// It does NOT exercise the batch-insert path -- that is covered by the +/// MessageStoreCompliance tests for each RDBMS backend. +/// +[Trait("Category", "Flaky")] +public class duplicate_message_handling_with_postgres_inbox : IAsyncLifetime +{ + private IHost _host = null!; + private string _topicName = null!; + + public async Task InitializeAsync() + { + DupTestHandler.Reset(); + + _topicName = $"dup-test-{Guid.NewGuid():N}"; + + _host = await Host.CreateDefaultBuilder() + .UseWolverine(opts => + { + opts.UseKafka(KafkaContainerFixture.ConnectionString) + .AutoProvision() + .ConfigureConsumers(c => c.AutoOffsetReset = AutoOffsetReset.Earliest); + opts.PersistMessagesWithPostgresql(Servers.PostgresConnectionString, "kafka_dup_test"); + + opts.ListenToKafkaTopic(_topicName).UseDurableInbox(); + + opts.Discovery.IncludeAssembly(GetType().Assembly); + opts.Services.AddResourceSetupOnStartup(); + }).StartAsync(); + } + + public async Task DisposeAsync() + { + await _host.StopAsync(); + _host.Dispose(); + } + + [Fact] + public async Task duplicate_message_is_discarded_and_partition_continues() + { + var fixedId = Guid.NewGuid(); + var freshId = Guid.NewGuid(); + + var transport = _host.GetRuntime().Options.Transports.GetOrCreate(); + var producerBuilder = new ProducerBuilder(transport.ProducerConfig); + using var producer = producerBuilder.Build(); + + await ProduceAsync(producer, _topicName, fixedId, new DupTestMessage("first")); + await ProduceAsync(producer, _topicName, fixedId, new DupTestMessage("duplicate")); + await ProduceAsync(producer, _topicName, freshId, new DupTestMessage("third")); + producer.Flush(); + + // Wait until both unique envelope IDs have been processed by the handler. + var deadline = DateTimeOffset.UtcNow.AddSeconds(30); + while (DateTimeOffset.UtcNow < deadline) + { + if (DupTestHandler.HandledIds.Contains(fixedId) + && DupTestHandler.HandledIds.Contains(freshId)) + { + break; + } + + await Task.Delay(250); + } + + DupTestHandler.HandledIds.ShouldContain(fixedId); + DupTestHandler.HandledIds.ShouldContain(freshId); + + // The handler must run exactly once for the fixedId — the duplicate is discarded. + DupTestHandler.HandledIdHistory.Count(id => id == fixedId).ShouldBe(1); + + // Bodies seen: "first" and "third" — never "duplicate". + DupTestHandler.HandledBodies.ShouldContain("first"); + DupTestHandler.HandledBodies.ShouldContain("third"); + DupTestHandler.HandledBodies.ShouldNotContain("duplicate"); + } + + private static Task> ProduceAsync( + IProducer producer, + string topic, + Guid envelopeId, + DupTestMessage payload) + { + var headers = new Headers + { + { "id", Encoding.UTF8.GetBytes(envelopeId.ToString()) }, + { "message-type", Encoding.UTF8.GetBytes(typeof(DupTestMessage).ToMessageTypeName()) }, + { "content-type", Encoding.UTF8.GetBytes("application/json") } + }; + + var body = JsonSerializer.SerializeToUtf8Bytes(payload); + + return producer.ProduceAsync(topic, new Message + { + Key = envelopeId.ToString(), + Value = body, + Headers = headers + }); + } +} + +public class DupTestMessage +{ + public DupTestMessage() + { + } + + public DupTestMessage(string body) + { + Body = body; + } + + public string Body { get; set; } = null!; +} + +public static class DupTestHandler +{ + private static readonly object _lock = new(); + private static readonly List _handledIdHistory = new(); + private static readonly HashSet _handledIds = new(); + private static readonly List _handledBodies = new(); + + public static IReadOnlyList HandledIdHistory + { + get { lock (_lock) { return _handledIdHistory.ToArray(); } } + } + + public static IReadOnlyCollection HandledIds + { + get { lock (_lock) { return _handledIds.ToArray(); } } + } + + public static IReadOnlyList HandledBodies + { + get { lock (_lock) { return _handledBodies.ToArray(); } } + } + + public static void Reset() + { + lock (_lock) + { + _handledIdHistory.Clear(); + _handledIds.Clear(); + _handledBodies.Clear(); + } + } + + public static void Handle(DupTestMessage message, Envelope envelope) + { + lock (_lock) + { + _handledIdHistory.Add(envelope.Id); + _handledIds.Add(envelope.Id); + _handledBodies.Add(message.Body); + } + } +} diff --git a/src/Wolverine/Persistence/Durability/DuplicateIncomingEnvelopeException.cs b/src/Wolverine/Persistence/Durability/DuplicateIncomingEnvelopeException.cs index a9212ebba..370a08584 100644 --- a/src/Wolverine/Persistence/Durability/DuplicateIncomingEnvelopeException.cs +++ b/src/Wolverine/Persistence/Durability/DuplicateIncomingEnvelopeException.cs @@ -5,5 +5,30 @@ public class DuplicateIncomingEnvelopeException : Exception public DuplicateIncomingEnvelopeException(Envelope envelope) : base( $"Duplicate incoming envelope with message id {envelope.Id} at {envelope.Destination}") { + Duplicates = new[] { envelope }; } -} \ No newline at end of file + + /// + /// When the source can pinpoint each duplicate, contains + /// only the actual duplicates. When the source cannot tell (e.g. a transactional + /// batch insert that was rolled back as a unit), the entire batch is reported + /// here as "potentially duplicates" and callers should treat the list as + /// informational rather than authoritative. + /// + public DuplicateIncomingEnvelopeException(IReadOnlyList duplicates) : base( + Format(duplicates)) + { + Duplicates = duplicates; + } + + private static string Format(IReadOnlyList duplicates) + { + ArgumentNullException.ThrowIfNull(duplicates); + if (duplicates.Count == 0) + throw new ArgumentException("At least one envelope is required", nameof(duplicates)); + + return $"Duplicate incoming envelopes detected ({duplicates.Count} envelope(s))"; + } + + public IReadOnlyList Duplicates { get; } +} diff --git a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs index 6d616443b..13ae67e1e 100644 --- a/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs +++ b/src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs @@ -579,9 +579,15 @@ public async ValueTask ProcessReceivedMessagesAsync(DateTimeOffset now, IListene } catch (DuplicateIncomingEnvelopeException) { - // The batch contained a duplicate (e.g. broker redelivery race). The inbox is fine; - // fall through to the per-envelope path which dedupes correctly. Do NOT pause the listener. - foreach (var envelope in envelopes) await _receivingOne.PostAsync(envelope); + // The batch contained at least one duplicate. We cannot trust which + // envelopes were actually persisted (some drivers autocommit per + // statement on multi-statement batches), so we re-attempt every + // envelope through the per-envelope path. The single-envelope + // StoreIncomingAsync correctly distinguishes fresh inserts from + // duplicates: fresh ones get persisted and pipelined, duplicates + // throw and are completed at the listener via + // handleDuplicateIncomingEnvelope. Do NOT pause the listener. + foreach (var envelope in envelopes) await _receivingOne.PostAsync(envelope).ConfigureAwait(false); } catch (Exception e) {