From ef197b6a4daeba9ab3590ff2b162a76f7f9a7698 Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 1 May 2026 15:11:30 +0200 Subject: [PATCH] fix(rdbms-inbox): make RavenDb and CosmosDb batch inbox throw typed duplicates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two providers had their own StoreIncomingAsync(IReadOnlyList) overrides that did not honor the new DurableReceiver fall-back contract. Both silently broke message handling once the receiver started relying on a typed DuplicateIncomingEnvelopeException to switch to the per-envelope path: - RavenDb threw DuplicateIncomingEnvelopeException(envelopes[0]) for both NonUniqueObjectException and ConcurrencyException. That made the first envelope in the batch look like the duplicate whenever the actual colliding envelope was not first — DurableReceiver then acked an innocent fresh envelope at the listener and the real duplicate slipped through. Replaced with a findDuplicatesAsync helper that probes ExistsAsync per envelope and reports only the ids that already exist. Falls back to the full batch when no pre-existing match is found (e.g. a same-identity intra-batch collision with no prior insert) so the per-envelope retry path still has something to sort, and the existing Raven-only "throws on [envelope, envelope]" test keeps passing. - CosmosDb silently swallowed HTTP 409 with the comment "DurableReceiver will retry one at a time" — but post-receiver change, the retry only fires when DuplicateIncomingEnvelopeException is thrown. Without it the batch returned successfully and every envelope, including the duplicate, was routed to the handler. Now collects the conflicting envelopes and throws DuplicateIncomingEnvelopeException(duplicates) at the end of the loop so the rest of the batch still commits. (CosmosDb's compliance subclass is not currently picked up by the build script's test-class discovery, so the new compliance test does not exercise this on CI — the fix is the same shape as the Oracle one and the shared compliance assertion documents the contract.) --- .../Internals/CosmosDbMessageStore.Inbox.cs | 11 +++++++- .../Internals/RavenDbMessageStore.Inbox.cs | 26 ++++++++++++++++--- 2 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs index a0d35c990..f782b09ce 100644 --- a/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs +++ b/src/Persistence/Wolverine.CosmosDb/Internals/CosmosDbMessageStore.Inbox.cs @@ -89,6 +89,7 @@ public async Task StoreIncomingAsync(Envelope envelope) public async Task StoreIncomingAsync(IReadOnlyList envelopes) { + var duplicates = new List(); foreach (var envelope in envelopes) { var incoming = new IncomingMessage(envelope, this); @@ -98,9 +99,17 @@ public async Task StoreIncomingAsync(IReadOnlyList envelopes) } catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict) { - // Skip duplicates in batch mode; DurableReceiver will retry one at a time + duplicates.Add(envelope); } } + + if (duplicates.Count > 0) + { + // Surface so DurableReceiver completes only the actual duplicates at the + // listener and re-pipelines the fresh ones; silently swallowing would + // route every envelope (including the duplicate) to the handler. + throw new DuplicateIncomingEnvelopeException(duplicates); + } } public async Task ExistsAsync(Envelope envelope, CancellationToken cancellation) diff --git a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs index 3ce74e05c..e58897dd3 100644 --- a/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs +++ b/src/Persistence/Wolverine.RavenDb/Internals/RavenDbMessageStore.Inbox.cs @@ -102,17 +102,35 @@ public async Task StoreIncomingAsync(IReadOnlyList envelopes) catch (NonUniqueObjectException) { // Same envelope identity appeared twice in this batch (e.g. broker - // redelivery race). Surface as a duplicate so DurableReceiver falls back - // to the per-envelope path, which dedupes correctly without double-executing. - throw new DuplicateIncomingEnvelopeException(envelopes[0]); + // redelivery race). Identify which envelopes already exist so + // DurableReceiver only completes the actual duplicates and + // re-pipelines the fresh ones. + throw new DuplicateIncomingEnvelopeException(await findDuplicatesAsync(envelopes)); } catch (ConcurrencyException) { // At least one envelope is already in the inbox; same fallback contract. - throw new DuplicateIncomingEnvelopeException(envelopes[0]); + throw new DuplicateIncomingEnvelopeException(await findDuplicatesAsync(envelopes)); } } + private async Task> findDuplicatesAsync(IReadOnlyList envelopes) + { + var duplicates = new List(); + foreach (var envelope in envelopes) + { + if (await ExistsAsync(envelope, CancellationToken.None).ConfigureAwait(false)) + { + duplicates.Add(envelope); + } + } + + // Backend reported a duplicate but no envelope id matches an existing + // row (e.g. intra-batch collision with no prior insert). Surface every + // envelope so the per-envelope retry path can sort it out. + return duplicates.Count > 0 ? duplicates : envelopes; + } + public async Task ExistsAsync(Envelope envelope, CancellationToken cancellation) { using var session = _store.OpenAsyncSession();