Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public async Task StoreIncomingAsync(Envelope envelope)

public async Task StoreIncomingAsync(IReadOnlyList<Envelope> envelopes)
{
var duplicates = new List<Envelope>();
foreach (var envelope in envelopes)
{
var incoming = new IncomingMessage(envelope, this);
Expand All @@ -98,9 +99,17 @@ public async Task StoreIncomingAsync(IReadOnlyList<Envelope> envelopes)
}
catch (CosmosException e) when (e.StatusCode == HttpStatusCode.Conflict)
{
// Skip duplicates in batch mode; DurableReceiver will retry one at a time
duplicates.Add(envelope);
}
}

if (duplicates.Count > 0)
{
// Surface so DurableReceiver completes only the actual duplicates at the
// listener and re-pipelines the fresh ones; silently swallowing would
// route every envelope (including the duplicate) to the handler.
throw new DuplicateIncomingEnvelopeException(duplicates);
}
}

public async Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,35 @@ public async Task StoreIncomingAsync(IReadOnlyList<Envelope> envelopes)
catch (NonUniqueObjectException)
{
// Same envelope identity appeared twice in this batch (e.g. broker
// redelivery race). Surface as a duplicate so DurableReceiver falls back
// to the per-envelope path, which dedupes correctly without double-executing.
throw new DuplicateIncomingEnvelopeException(envelopes[0]);
// redelivery race). Identify which envelopes already exist so
// DurableReceiver only completes the actual duplicates and
// re-pipelines the fresh ones.
throw new DuplicateIncomingEnvelopeException(await findDuplicatesAsync(envelopes));
}
catch (ConcurrencyException)
{
// At least one envelope is already in the inbox; same fallback contract.
throw new DuplicateIncomingEnvelopeException(envelopes[0]);
throw new DuplicateIncomingEnvelopeException(await findDuplicatesAsync(envelopes));
}
}

private async Task<IReadOnlyList<Envelope>> findDuplicatesAsync(IReadOnlyList<Envelope> envelopes)
{
var duplicates = new List<Envelope>();
foreach (var envelope in envelopes)
{
if (await ExistsAsync(envelope, CancellationToken.None).ConfigureAwait(false))
{
duplicates.Add(envelope);
}
}

// Backend reported a duplicate but no envelope id matches an existing
// row (e.g. intra-batch collision with no prior insert). Surface every
// envelope so the per-envelope retry path can sort it out.
return duplicates.Count > 0 ? duplicates : envelopes;
}

public async Task<bool> ExistsAsync(Envelope envelope, CancellationToken cancellation)
{
using var session = _store.OpenAsyncSession();
Expand Down
Loading