Pulsar: Fix KeyNotFoundException ACKing batch messages on partitioned topics#2883
Conversation
…Pulsar topics Workaround apache/pulsar-dotpulsar#287: BatchHandler.Add constructs MessageId objects for each message inside a batch without setting the Topic field (it defaults to ""). Consumer.Acknowledge() does _subConsumers[messageId.Topic] to route the ACK to the correct partition sub-consumer — on a partitioned topic "" is never a valid key, so the lookup throws KeyNotFoundException and the message is never acknowledged, causing infinite redelivery. Non-batch messages are unaffected: ConsumerChannel uses ToMessageId(topic) for those, which correctly populates Topic. The new FixMessageId() helper reconstructs the MessageId with the correct partition sub-topic URI ({topic}-partition-{partition}) when Topic is empty and Partition >= 0. Applied at all three Acknowledge call sites in PulsarListener.
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
This PR addresses a DotPulsar batching issue where MessageId instances can be missing the topic, causing acknowledgements to fail on partitioned topics.
Changes:
- Update ack paths to acknowledge using a reconstructed
MessageIdthat includes the correct partition topic. - Introduce
PulsarListener.FixMessageIdhelper to rebuild missing topic info for partitioned messages. - Add unit tests covering reconstruction and no-op scenarios.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs | Reconstructs MessageId.Topic when missing and uses it for acknowledgements to avoid KeyNotFoundException on partitioned topics. |
| src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs | Adds tests validating FixMessageId behavior for missing-topic batch messages and no-op cases. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, | ||
| messageId.BatchIndex, $"{topicUri}-partition-{messageId.Partition}"); | ||
| } | ||
|
|
||
| return messageId; | ||
| } | ||
|
|
There was a problem hiding this comment.
This shouldn't be possible, topicUri is always from _endpoint.PulsarTopic(), which is the base URI already.
Happy to make changes (or note this in documentation) if desired.
| { | ||
| var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0); | ||
|
|
||
| var result = PulsarListener.FixMessageId(messageId, TopicUri); |
There was a problem hiding this comment.
InternalsVisibleTo is set already:
|
@patrick-cloke-simplisafe Do you want me to be reviewing this and getting it in soon? |
|
@patrick-cloke-simplisafe There is an |
Yes! Sorry for not undrafting this.
To double check the suggestion -- are you thinking that |
|
@patrick-cloke-simplisafe I slowed down the next Wolverine release until tomorrow. I'll investigate a bit more about that last question, but try to get this addressed in what I think will be 6.1.0 tomorrow |
I appreciate that! I think releases are frequent enough it wouldn't hurt my use-case too much to wait until the next one. I'll try to look at what it would take to use the envelope though. |
|
Superseded by #2950, which carries this fix forward with @patrick-cloke-simplisafe credited as co-author and adds one substantive refinement: each The Thanks for the diagnosis and original work here! |
|
Thanks for finishing it up, really appreciate your hard work! 👍 |
…ion-ack Pulsar: fix KeyNotFoundException acking batch messages on partitioned topics (supersedes #2883)
… topics (supersedes JasperFx#2883) Workaround apache/pulsar-dotpulsar#287: DotPulsar's BatchHandler.Add constructs an inner MessageId for each message of a batch via the four-arg ctor and never sets MessageId.Topic, so Consumer.Acknowledge hits _subConsumers[messageId.Topic] with the empty string and throws KeyNotFoundException on partitioned topics. Non-batch messages are unaffected: ConsumerChannel uses ToMessageId(topic) for those, which correctly populates Topic. PulsarListener.FixMessageId reconstructs the MessageId with the partition sub-topic URI when Topic is empty and Partition >= 0; the helper short- circuits and returns the original instance when Topic is already populated (forward-compat for the upstream fix) or the message is non-partitioned. Applied at the three Acknowledge call sites in PulsarListener: CompleteAsync, DeferAsync, and moveToQueueAsync (the DLQ / retry-letter path). This supersedes @patrick-cloke-simplisafe's PR JasperFx#2883 with one substantive change: each call site uses the source consumer's own IConsumer.Topic property (`consumer.Topic` / `sourceConsumer.Topic`) rather than _endpoint.PulsarTopic(). DotPulsar exposes Topic on the IConsumer abstraction (Abstractions/IConsumer.cs), so the same FixMessageId helper works for the retry consumer without branching - the retry consumer is subscribed to {baseTopic}-RETRY (or a user-configured retry topic), and using _endpoint.PulsarTopic() would have reconstructed wrong sub-topic names ("{baseTopic}-partition-N" instead of "{baseTopic}-RETRY-partition-N") and re-triggered the same KeyNotFoundException on partitioned retry topics. Carries Patrick's three unit tests verbatim plus one additional case covering the retry-topic reconstruction. Investigated whether Envelope.TopicName could replace the reconstruction entirely (per @jeremydmiller's review comment on JasperFx#2883). It cannot: DotPulsar's IMessage<T> has no Topic property — the only sources for the partition sub-topic at the ack site are MessageId.Topic when set (works for non-batched) and reconstruction from the consumer's base topic + MessageId.Partition (necessary for the batched-message bug). Populating Envelope.TopicName from the same reconstruction would just centralize the formatting in PulsarEnvelopeMapper; it doesn't eliminate it. A separate Envelope.TopicName improvement for user-facing topic visibility on incoming Pulsar messages is worth pursuing in its own PR but out of scope for this fix. Co-Authored-By: Patrick Cloke <202853352+patrick-cloke-simplisafe@users.noreply.github.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Bug-fix + feature release on top of 6.1.0 — 13 PRs. Notable additions: - Custom Result<T> handler-return-value support (Phases 0+1+2+3, #2952, refs #2221) - DbContext abstractions for EF Core transaction middleware (#2919 + docs/tests #2954) - Outgoing Envelope pooling at MessageRouter.RouteForPublish (#2956, closes #2955) — ~-504 B/op on transport-bound sends per the CritterStackScalability WolverineTransportBenchmarks harness Bug fixes: scheduled-cascade loss from [ReadAggregate]/[DocumentExists] handlers (#2941), ancillary-store inbox routing (#2944), Postgres queue-name length (#2942), MySQL node-record quoting (#2940), Pulsar batched-partition ack KeyNotFoundException (#2883/#2950), remote-node agent reply timeout (#2949), and additional resource-disposal cleanup (#2894 from dmytro-pryvedeniuk). Polecat bumped 4.1.1 -> 4.2.1 (#2947); Marten + JasperFx families unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Workaround apache/pulsar-dotpulsar#287:
BatchHandler.AddconstructsMessageIdobjects for each message inside a batch without setting the Topic field (it defaults to "").Consumer.Acknowledge()does_subConsumers[messageId.Topic]to route the ACK to the correct partition sub-consumer — on a partitioned topic""is never a valid key, so the lookup throwsKeyNotFoundExceptionand the message is never acknowledged, causing infinite redelivery.Non-batch messages are unaffected:
ConsumerChannelusesToMessageId(topic)for those, which correctly populatesTopic.The new
FixMessageId()helper reconstructs theMessageIdwith the correct partition sub-topic URI ({topic}-partition-{partition}) when Topic is empty and Partition >= 0. Applied at all threeAcknowledgecall sites inPulsarListener.