From 39add92e7a3a3014f85718bba296fd531b310016 Mon Sep 17 00:00:00 2001 From: "Jeremy D. Miller" Date: Thu, 28 May 2026 14:11:03 -0500 Subject: [PATCH] Pulsar: fix KeyNotFoundException acking batch messages on partitioned topics (supersedes #2883) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Workaround https://github.com/apache/pulsar-dotpulsar/issues/287: DotPulsar's BatchHandler.Add constructs an inner MessageId for each message of a batch via the four-arg ctor and never sets MessageId.Topic, so Consumer.Acknowledge hits _subConsumers[messageId.Topic] with the empty string and throws KeyNotFoundException on partitioned topics. Non-batch messages are unaffected: ConsumerChannel uses ToMessageId(topic) for those, which correctly populates Topic. PulsarListener.FixMessageId reconstructs the MessageId with the partition sub-topic URI when Topic is empty and Partition >= 0; the helper short- circuits and returns the original instance when Topic is already populated (forward-compat for the upstream fix) or the message is non-partitioned. Applied at the three Acknowledge call sites in PulsarListener: CompleteAsync, DeferAsync, and moveToQueueAsync (the DLQ / retry-letter path). This supersedes @patrick-cloke-simplisafe's PR #2883 with one substantive change: each call site uses the source consumer's own IConsumer.Topic property (`consumer.Topic` / `sourceConsumer.Topic`) rather than _endpoint.PulsarTopic(). DotPulsar exposes Topic on the IConsumer abstraction (Abstractions/IConsumer.cs), so the same FixMessageId helper works for the retry consumer without branching - the retry consumer is subscribed to {baseTopic}-RETRY (or a user-configured retry topic), and using _endpoint.PulsarTopic() would have reconstructed wrong sub-topic names ("{baseTopic}-partition-N" instead of "{baseTopic}-RETRY-partition-N") and re-triggered the same KeyNotFoundException on partitioned retry topics. Carries Patrick's three unit tests verbatim plus one additional case covering the retry-topic reconstruction. Investigated whether Envelope.TopicName could replace the reconstruction entirely (per @jeremydmiller's review comment on #2883). It cannot: DotPulsar's IMessage has no Topic property — the only sources for the partition sub-topic at the ack site are MessageId.Topic when set (works for non-batched) and reconstruction from the consumer's base topic + MessageId.Partition (necessary for the batched-message bug). Populating Envelope.TopicName from the same reconstruction would just centralize the formatting in PulsarEnvelopeMapper; it doesn't eliminate it. A separate Envelope.TopicName improvement for user-facing topic visibility on incoming Pulsar messages is worth pursuing in its own PR but out of scope for this fix. Co-Authored-By: Patrick Cloke <202853352+patrick-cloke-simplisafe@users.noreply.github.com> Co-Authored-By: Claude Opus 4.7 (1M context) --- .../FixMessageIdTests.cs | 73 +++++++++++++++++++ .../Pulsar/Wolverine.Pulsar/PulsarListener.cs | 29 +++++++- 2 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs new file mode 100644 index 000000000..a4afc4030 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs @@ -0,0 +1,73 @@ +using DotPulsar; +using Shouldly; +using Xunit; + +namespace Wolverine.Pulsar.Tests; + +// Regression for the DotPulsar batch-handler partitioned-topic bug +// (https://github.com/apache/pulsar-dotpulsar/issues/287). Builds on @patrick-cloke-simplisafe's +// original repro in PR #2883; the only structural difference here is that PulsarListener now +// reconstructs the partition sub-topic URI from the source consumer's own Topic +// (IConsumer.Topic) rather than from _endpoint.PulsarTopic(). That additionally covers the +// retry-consumer ack path - the retry consumer is subscribed to {baseTopic}-RETRY (or a +// user-configured retry topic), so reusing the listener's base endpoint URI would have +// reconstructed wrong sub-topic names and produced the same KeyNotFoundException on partitioned +// retry topics. +public class FixMessageIdTests +{ + private const string TopicUri = "persistent://public/default/events"; + private const string RetryTopicUri = "persistent://public/default/events-RETRY"; + + [Fact] + public void FixMessageId_BatchMessageMissingTopic_ReconstructsPartitionTopic() + { + var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0); + + var result = PulsarListener.FixMessageId(messageId, TopicUri); + + result.Topic.ShouldBe($"{TopicUri}-partition-3"); + result.LedgerId.ShouldBe(1UL); + result.EntryId.ShouldBe(2UL); + result.Partition.ShouldBe(3); + result.BatchIndex.ShouldBe(0); + } + + [Fact] + public void FixMessageId_TopicAlreadySet_ReturnsOriginal() + { + var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0, topic: $"{TopicUri}-partition-3"); + + var result = PulsarListener.FixMessageId(messageId, TopicUri); + + result.ShouldBeSameAs(messageId); + } + + [Fact] + public void FixMessageId_NonPartitionedMessage_ReturnsOriginal() + { + var messageId = new MessageId(1UL, 2UL, partition: -1, batchIndex: -1); + + var result = PulsarListener.FixMessageId(messageId, TopicUri); + + result.ShouldBeSameAs(messageId); + } + + [Fact] + public void FixMessageId_BatchMessageOnPartitionedRetryTopic_UsesRetryTopicForReconstruction() + { + // When the source consumer is the retry consumer, the listener passes that consumer's + // Topic - which is the retry topic URI, not the base topic URI. The reconstruction must + // therefore key the partition sub-topic off the retry topic so the ack lands on the + // correct sub-consumer inside DotPulsar's IConsumer._subConsumers map. Passing the base + // topic URI here (as the prior reconstruction did) would produce + // "events-partition-3" — a key the retry consumer's sub-consumers map does not contain — + // and re-trigger the same KeyNotFoundException the original bug reported on the main + // ack path. + var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0); + + var result = PulsarListener.FixMessageId(messageId, RetryTopicUri); + + result.Topic.ShouldBe($"{RetryTopicUri}-partition-3"); + result.Partition.ShouldBe(3); + } +} diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs index b1894ea99..8ae76fc80 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs @@ -158,7 +158,7 @@ public ValueTask CompleteAsync(Envelope envelope) var consumer = e.IsFromRetryConsumer && _retryConsumer != null ? _retryConsumer : _consumer; if (consumer != null) { - return consumer.Acknowledge(e.MessageData, _cancellation); + return consumer.Acknowledge(FixMessageId(e.MessageData.MessageId, consumer.Topic), _cancellation); } } @@ -172,7 +172,7 @@ public async ValueTask DeferAsync(Envelope envelope) if (_enableRequeue && _sender is not null && envelope is PulsarEnvelope e) { var consumer = e.IsFromRetryConsumer && _retryConsumer != null ? _retryConsumer : _consumer; - await consumer!.Acknowledge(e.MessageData, _cancellation); + await consumer!.Acknowledge(FixMessageId(e.MessageData.MessageId, consumer.Topic), _cancellation); await _sender.SendAsync(envelope); } } @@ -297,7 +297,7 @@ private async Task moveToQueueAsync(Envelope envelope, Exception? exception, boo } // Acknowledge the original message - await sourceConsumer!.Acknowledge(e.MessageData, _cancellation); + await sourceConsumer!.Acknowledge(FixMessageId(e.MessageData.MessageId, sourceConsumer.Topic), _cancellation); // Send copy to retry/DLQ topic await targetProducer.Send(messageMetadata, e.MessageData.Data, _cancellation) @@ -305,6 +305,29 @@ await targetProducer.Send(messageMetadata, e.MessageData.Data, _cancellation) } } + /// + /// Workaround for https://github.com/apache/pulsar-dotpulsar/issues/287. DotPulsar's + /// BatchHandler.Add constructs the inner for each message of a + /// batch via the four-arg ctor and never sets , so + /// Consumer.Acknowledge hits _subConsumers[messageId.Topic] with the empty + /// string and throws on + /// partitioned topics. Reconstruct the with the partition sub-topic + /// URI built from the the source consumer was subscribed to. + /// Using consumer.Topic rather than _endpoint.PulsarTopic() means the same + /// helper works for the retry consumer (which subscribes to {baseTopic}-RETRY) without + /// any branching — DotPulsar reports the actual subscribed topic per consumer. + /// + internal static MessageId FixMessageId(MessageId messageId, string consumerTopic) + { + if (string.IsNullOrEmpty(messageId.Topic) && messageId.Partition >= 0) + { + return new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, + messageId.BatchIndex, $"{consumerTopic}-partition-{messageId.Partition}"); + } + + return messageId; + } + private MessageMetadata BuildMessageMetadata(Envelope envelope, PulsarEnvelope e, Exception? exception, bool isDeadLettered) {