diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs index a0d35c990..f782b09ce 100644 --- a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs @@ -89,6 +89,7 @@ public async Task StoreIncomingAsync(Envelope envelope) public async Task StoreIncomingAsync(IReadOnlyList envelopes) { + var duplicates = new List(); foreach (var envelope in envelopes) { var incoming = new IncomingMessage(envelope, this); @@ -98,9 +99,17 @@ public async Task StoreIncomingAsync(IReadOnlyList envelopes) } catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict) { - // Skip duplicates in batch mode; DurableReceiver will retry one at a time + duplicates.Add(envelope); } } + + if (duplicates.Count > 0) + { + // Surface so DurableReceiver completes only the actual duplicates at the + // listener and re-pipelines the fresh ones; silently swallowing would + // route every envelope (including the duplicate) to the handler. + throw new DuplicateIncomingEnvelopeException(duplicates); + } } public async Task ExistsAsync(Envelope envelope, CancellationToken cancellation) diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs index 3ce74e05c..e58897dd3 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs @@ -102,17 +102,35 @@ public async Task StoreIncomingAsync(IReadOnlyList envelopes) catch (NonUniqueObjectException) { // Same envelope identity appeared twice in this batch (e.g. broker - // redelivery race). Surface as a duplicate so DurableReceiver falls back - // to the per-envelope path, which dedupes correctly without double-executing. - throw new DuplicateIncomingEnvelopeException(envelopes[0]); + // redelivery race). Identify which envelopes already exist so + // DurableReceiver only completes the actual duplicates and + // re-pipelines the fresh ones. + throw new DuplicateIncomingEnvelopeException(await findDuplicatesAsync(envelopes)); } catch (ConcurrencyException) { // At least one envelope is already in the inbox; same fallback contract. - throw new DuplicateIncomingEnvelopeException(envelopes[0]); + throw new DuplicateIncomingEnvelopeException(await findDuplicatesAsync(envelopes)); } } + private async Task> findDuplicatesAsync(IReadOnlyList envelopes) + { + var duplicates = new List(); + foreach (var envelope in envelopes) + { + if (await ExistsAsync(envelope, CancellationToken.None).ConfigureAwait(false)) + { + duplicates.Add(envelope); + } + } + + // Backend reported a duplicate but no envelope id matches an existing + // row (e.g. intra-batch collision with no prior insert). Surface every + // envelope so the per-envelope retry path can sort it out. + return duplicates.Count > 0 ? duplicates : envelopes; + } + public async Task ExistsAsync(Envelope envelope, CancellationToken cancellation) { using var session = _store.OpenAsyncSession();