fix(rdbms-inbox): discard duplicate Kafka messages instead of freezing the partition (#2639)#2648
Merged
jeremydmiller merged 2 commits intoJasperFx:mainfrom May 1, 2026
Conversation
…h paths Duplicate Kafka messages with identical envelope IDs were not discarded when using a durable Postgres inbox: the primary-key constraint violation escaped the duplicate-detection layer, the listener was paused as inbox-unavailable, the Kafka offset never committed, and the partition froze. Fixes JasperFx#2639. Three changes: - Detection (per backend) now uses driver-specific error codes as the primary signal (Postgres SqlState 23505, SQL Server 2627/2601, MySQL 1062, Sqlite extended codes 1555/2067) with message-match as fallback, and traverses InnerException / AggregateException so wrapped exceptions are also recognised. - The two previously unprotected batch paths in MessageDatabase<T> -- StoreIncomingAsync(DbTransaction, Envelope[]) and StoreIncomingAsync(IReadOnlyList<Envelope>) -- now convert duplicate-key errors into DuplicateIncomingEnvelopeException. The list path wraps the multi-statement insert in an explicit transaction so the rollback is uniform across drivers (SqlClient/MySqlConnector/Microsoft.Data.Sqlite autocommit per statement otherwise, which would partially persist a batch on duplicate-key failure). After rollback, the exact duplicates are identified via per-envelope ExistsAsync; the caller retries the rest through the single-envelope path. - DuplicateIncomingEnvelopeException gains a list-accepting constructor and Duplicates property so callers (Marten outbox, direct test scenarios) can inspect which envelopes collided. DurableReceiver's batch path catches the typed exception and re-posts every envelope through the per-envelope path, where the single StoreIncomingAsync correctly distinguishes fresh inserts from duplicates -- fresh ones are pipelined to the handler, duplicates complete at the listener without pausing it. Tests: new compliance test (bulk_store_with_intra_batch_duplicate) runs across Postgres, SQL Server, MySQL, and Sqlite and asserts that only the actually-existing envelope is reported as a duplicate (not innocent fresh envelopes); new mock test covers the multi-envelope branch in DurableReceiver; new Kafka end-to-end test verifies the locale- independent duplicate detection works against a live Postgres driver. Also removes a stray mise.toml that pinned dotnet to 9.0 and conflicted with global.json (which is the project's actual toolchain pin). Closes JasperFx#2639
Three follow-ups to the duplicate-envelope batch handling change, all caught by the PR run: - Oracle: StoreIncomingAsync(IReadOnlyList) and StoreIncomingAsync( DbTransaction, Envelope[]) silently swallowed ORA-00001 with an "Idempotent" comment. That violates the cross-provider contract the new compliance test asserts -- DurableReceiver relies on a thrown DuplicateIncomingEnvelopeException to fall back to the per-envelope path, and without it the listener neither acks duplicates nor pipelines fresh envelopes correctly. Both Oracle batch paths now collect the rejected envelopes and throw DuplicateIncomingEnvelopeException(duplicates) after the loop, so the rest of the batch still commits. - Compliance test rename: the new bulk_store_with_intra_batch_duplicate_throws_DuplicateIncomingEnvelopeException in MessageStoreCompliance collided with an existing same-named Raven-specific override (xUnit1024). The two tests assert different contracts -- the Raven override checks "an exception is thrown", the compliance test additionally asserts that Duplicates contains exactly the actually-existing envelope (and no innocent fresh envelopes that DurableReceiver would otherwise complete-without- processing). Renamed the compliance test to bulk_store_intra_batch_duplicate_reports_only_actual_duplicates so both coexist. - CoreTests assertion: the new DurableReceiver batch test asserted pipeline.Received().InvokeAsync(theFreshEnvelope, ...). That pipeline call sits behind a second Dataflow block (_receiver) which DrainAsync only awaits when _latched is already true; latching before drain would deflect the fresh envelope down the listener- defer path instead, so neither order works for this test. Replaced it with theRuntime.Storage.Inbox.Received().StoreIncomingAsync( theFreshEnvelope), which checks the same contract one layer up (per-envelope path was retried for the fresh envelope) without depending on downstream drain ordering.
3 tasks
This was referenced May 2, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes #2639 — duplicate Kafka messages with identical envelope IDs were not being discarded when using a durable Postgres inbox. The primary-key constraint violation escaped
the duplicate-detection layer, the listener was paused as inbox-unavailable, the Kafka offset never committed, and the partition froze.
The bug surfaced via Kafka but the root cause was in the shared RDBMS inbox layer: detection was locale-dependent (matched the English Postgres error message), and the two
batch insert paths in
MessageDatabase<T>did not convert duplicate-key violations into the typedDuplicateIncomingEnvelopeExceptionthatDurableReceiverknows how tohandle.
Changes
1. Locale-independent duplicate detection (per backend)
Each
isExceptionFromDuplicateEnvelopeoverride now uses driver-specific error codes as the primary signal, with message-match retained as a fallback for unusual wrappers:PostgresException.SqlState == "23505"SqlException.Number ∈ {2627, 2601}MySqlException.Number == 1062SqliteException.SqliteExtendedErrorCode ∈ {1555, 2067}A new
protected bool IsDuplicateEnvelopeException(Exception ex)helper onMessageDatabase<T>traversesInnerExceptionandAggregateException.InnerExceptions, callingthe per-backend hook at each level — so wrapped exceptions (e.g., a
PostgresExceptionnested inside anInvalidOperationExceptionfrom the connection layer) are stillrecognised.
2. Batch-path duplicate conversion
The two previously unprotected paths in
MessageDatabase<T>.Incoming.csnow convert duplicate-key errors intoDuplicateIncomingEnvelopeException:StoreIncomingAsync(DbTransaction, Envelope[])— used by Marten/EF outbox flush. WrapsExecuteNonQueryAsyncin try/catch and converts. The caller's transaction willroll back, which is the correct semantic for an outbox flush colliding with an existing inbox row.
StoreIncomingAsync(IReadOnlyList<Envelope>)— used byDurableReceiver's batch receive. Wraps the multi-statement insert in an explicit transaction so therollback is uniform across drivers. SqlClient / MySqlConnector / Microsoft.Data.Sqlite autocommit per statement otherwise, which would partially persist a batch on
duplicate-key failure and leave the inbox in a state indistinguishable from "envelope was already there". After rollback, the exact duplicates are identified via
per-envelope
ExistsAsync. If the backend reported a duplicate but no envelope id matches an existing row, the original failure is rethrown rather than silently swallowed.3.
DuplicateIncomingEnvelopeException+DurableReceiverglueThe exception gains a list-accepting constructor and an
IReadOnlyList<Envelope> Duplicatesproperty so callers (Marten outbox, direct test scenarios) can inspect whichenvelopes collided. The constructor's XML doc is honest about cases where the source can't pinpoint each duplicate (transactional batch rolled back as a unit) —
Duplicatesis informational rather than authoritative there.
DurableReceiver.ProcessReceivedMessagesAsynccatches the typed exception and re-posts every envelope through the per-envelope path. The single-envelopeStoreIncomingAsyncthen correctly distinguishes each: fresh ones are pipelined to the handler, duplicates throw and are completed at the listener viahandleDuplicateIncomingEnvelope(which callsListener.CompleteAsync— for Kafka, that commits the offset). The listener is not paused.Test plan
when_durable_receiver_detects_duplicate_incoming_envelopes_in_batchmock test covering the mixed-batch routing.bulk_store_with_intra_batch_duplicate_throws_DuplicateIncomingEnvelopeExceptiontest asserts that only the actually-existing envelope isreported as a duplicate (
Count == 1, no fresh envelopes leak intoDuplicates). Runs and passes against:PostgresqlTests, full suite 371/371)SqlServerTests, MessageStore filter 129/129)MySqlTests, MessageStore filter 41/41)SqliteTests, MessageStore filter 41/41)duplicate_message_handling_with_postgres_inboxtest verifies that a duplicate envelope id arriving over the real Kafka transport against areal Postgres durable inbox is silently discarded, the listener is not paused, and a subsequent fresh message is processed normally. This exercises the single-envelope path
(which is what Kafka uses today) and the locale-independent SqlState detection against a live Npgsql driver.
when_durable_receiver_detects_duplicate_incoming_envelope(single-envelope path) still passes;store_a_single_incoming_envelope_that_is_a_duplicatecompliance test still passes across all backends.Notes
OracleMessageStoreimplementation (separate fromMessageDatabase<T>) and is out of scope for this fix. Its single-envelope path already convertsto
DuplicateIncomingEnvelopeException; its batch path silently swallows duplicates. Both are existing behaviour, not changed here.mise.tomlremoval is a small cleanup: it pinneddotnetto9.0whileglobal.json(the project's canonical toolchain pin) requires10.0.100withrollForward: latestMajor. The two configs were in conflict;global.jsonis the source of truth.Duplicatesproperty and list-accepting constructor onDuplicateIncomingEnvelopeException. TheStoreIncomingAsync(DbTransaction, Envelope[])signature went fromTasktoasync Task— source-compatible for any awaiter.Closes #2639