Pulsar: fix KeyNotFoundException acking batch messages on partitioned topics (supersedes #2883)#2950
Merged
Conversation
… topics (supersedes #2883) Workaround apache/pulsar-dotpulsar#287: DotPulsar's BatchHandler.Add constructs an inner MessageId for each message of a batch via the four-arg ctor and never sets MessageId.Topic, so Consumer.Acknowledge hits _subConsumers[messageId.Topic] with the empty string and throws KeyNotFoundException on partitioned topics. Non-batch messages are unaffected: ConsumerChannel uses ToMessageId(topic) for those, which correctly populates Topic. PulsarListener.FixMessageId reconstructs the MessageId with the partition sub-topic URI when Topic is empty and Partition >= 0; the helper short- circuits and returns the original instance when Topic is already populated (forward-compat for the upstream fix) or the message is non-partitioned. Applied at the three Acknowledge call sites in PulsarListener: CompleteAsync, DeferAsync, and moveToQueueAsync (the DLQ / retry-letter path). This supersedes @patrick-cloke-simplisafe's PR #2883 with one substantive change: each call site uses the source consumer's own IConsumer.Topic property (`consumer.Topic` / `sourceConsumer.Topic`) rather than _endpoint.PulsarTopic(). DotPulsar exposes Topic on the IConsumer abstraction (Abstractions/IConsumer.cs), so the same FixMessageId helper works for the retry consumer without branching - the retry consumer is subscribed to {baseTopic}-RETRY (or a user-configured retry topic), and using _endpoint.PulsarTopic() would have reconstructed wrong sub-topic names ("{baseTopic}-partition-N" instead of "{baseTopic}-RETRY-partition-N") and re-triggered the same KeyNotFoundException on partitioned retry topics. Carries Patrick's three unit tests verbatim plus one additional case covering the retry-topic reconstruction. Investigated whether Envelope.TopicName could replace the reconstruction entirely (per @jeremydmiller's review comment on #2883). It cannot: DotPulsar's IMessage<T> has no Topic property — the only sources for the partition sub-topic at the ack site are MessageId.Topic when set (works for non-batched) and reconstruction from the consumer's base topic + MessageId.Partition (necessary for the batched-message bug). Populating Envelope.TopicName from the same reconstruction would just centralize the formatting in PulsarEnvelopeMapper; it doesn't eliminate it. A separate Envelope.TopicName improvement for user-facing topic visibility on incoming Pulsar messages is worth pursuing in its own PR but out of scope for this fix. Co-Authored-By: Patrick Cloke <202853352+patrick-cloke-simplisafe@users.noreply.github.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
jeremydmiller
added a commit
that referenced
this pull request
May 28, 2026
Bug-fix + feature release on top of 6.1.0 — 13 PRs. Notable additions: - Custom Result<T> handler-return-value support (Phases 0+1+2+3, #2952, refs #2221) - DbContext abstractions for EF Core transaction middleware (#2919 + docs/tests #2954) - Outgoing Envelope pooling at MessageRouter.RouteForPublish (#2956, closes #2955) — ~-504 B/op on transport-bound sends per the CritterStackScalability WolverineTransportBenchmarks harness Bug fixes: scheduled-cascade loss from [ReadAggregate]/[DocumentExists] handlers (#2941), ancillary-store inbox routing (#2944), Postgres queue-name length (#2942), MySQL node-record quoting (#2940), Pulsar batched-partition ack KeyNotFoundException (#2883/#2950), remote-node agent reply timeout (#2949), and additional resource-disposal cleanup (#2894 from dmytro-pryvedeniuk). Polecat bumped 4.1.1 -> 4.2.1 (#2947); Marten + JasperFx families unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Supersedes #2883. Co-authored with @patrick-cloke-simplisafe, who diagnosed the upstream DotPulsar bug and wrote the original fix.
The bug
DotPulsar's
BatchHandler.Add(apache/pulsar-dotpulsar#287) constructs the innerMessageIdfor each message of a batch via the four-arg ctor and never setsMessageId.Topic.Consumer.Acknowledgethen does_subConsumers[messageId.Topic]with the empty string and throwsKeyNotFoundExceptionon partitioned topics, so batched messages on partitioned topics are never acked → infinite redelivery.Non-batched messages are unaffected:
ConsumerChannelusesToMessageId(topic)for those, which correctly populatesTopic.Fix
PulsarListener.FixMessageIdreconstructs theMessageIdwith the partition sub-topic URI ({topic}-partition-{N}) whenTopicis empty andPartition >= 0. Short-circuits and returns the originalMessageIdinstance otherwise (forward-compat for when the upstream DotPulsar fix lands; non-partitioned messages pass through unchanged). Applied at the threeAcknowledgecall sites inPulsarListener:CompleteAsync,DeferAsync, andmoveToQueueAsync(the DLQ / retry-letter path).Difference from #2883
One substantive change: each ack call site uses the source consumer's own
IConsumer.Topic(consumer.Topic/sourceConsumer.Topic) rather than_endpoint.PulsarTopic(). DotPulsar exposesTopicon theIConsumerabstraction (verified inDotPulsar/Abstractions/IConsumer.cs; stable since 3.x), so the same helper works correctly for the retry consumer without any branching. The retry consumer is subscribed to{baseTopic}-RETRY(or a user-configured retry topic), so the original_endpoint.PulsarTopic()-based reconstruction would have produced wrong sub-topic names ({baseTopic}-partition-Ninstead of{baseTopic}-RETRY-partition-N) and re-triggered the sameKeyNotFoundExceptionon the retry path for users with partitioned retry topics. Latent in the original PR; addressed here.Carries Patrick's three unit tests verbatim, plus one new test (
FixMessageId_BatchMessageOnPartitionedRetryTopic_UsesRetryTopicForReconstruction) that pins the retry-topic reconstruction contract.On
Envelope.TopicName@jeremydmiller asked on #2883 whether
Envelope.TopicNamecould carry the topic and avoid URI reconstruction entirely. It can't. DotPulsar'sIMessage<T>has noTopicproperty — the concreteMessage<T>ctor takes notopicparameter, andMessageFactory<T>doesn't even know the topic it's constructing for. The only sources for the partition sub-topic at the ack site are (a)MessageId.Topicwhen set (works for non-batched) and (b) reconstruction from the consumer's base topic +MessageId.Partition. PopulatingEnvelope.TopicNamefrom that same reconstruction would just centralize the formatting inPulsarEnvelopeMapper; it doesn't eliminate it.A separate enhancement to surface topic info to user handlers via
Envelope.TopicNameon incoming Pulsar messages is worth pursuing in its own PR — but it's out of scope for the bug fix here.Verification
FixMessageIdTests: 4/4 pass locally (net9.0).dotnet build wolverine.slnx -c Releaseclean (0 warnings, 0 errors).PulsarTransportComplianceTests,InlinePulsarTransportComplianceTests,with_cloud_events) has 11 pre-existing local failures around retry/DLQ — verified that the same tests fail identically on baselineorigin/mainwith my fix stashed (BadImageFormatExceptionduring handler load → Wolverine runtime-codegen issue with Apple-Silicon macOS, unrelated to this change). CI on Linux x64 validates them.🤖 Generated with Claude Code