diff --git a/src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs new file mode 100644 index 000000000..a4afc4030 --- /dev/null +++ b/src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs @@ -0,0 +1,73 @@ +using DotPulsar; +using Shouldly; +using Xunit; + +namespace Wolverine.Pulsar.Tests; + +// Regression for the DotPulsar batch-handler partitioned-topic bug +// (https://github.com/apache/pulsar-dotpulsar/issues/287). Builds on @patrick-cloke-simplisafe's +// original repro in PR #2883; the only structural difference here is that PulsarListener now +// reconstructs the partition sub-topic URI from the source consumer's own Topic +// (IConsumer.Topic) rather than from _endpoint.PulsarTopic(). That additionally covers the +// retry-consumer ack path - the retry consumer is subscribed to {baseTopic}-RETRY (or a +// user-configured retry topic), so reusing the listener's base endpoint URI would have +// reconstructed wrong sub-topic names and produced the same KeyNotFoundException on partitioned +// retry topics. +public class FixMessageIdTests +{ + private const string TopicUri = "persistent://public/default/events"; + private const string RetryTopicUri = "persistent://public/default/events-RETRY"; + + [Fact] + public void FixMessageId_BatchMessageMissingTopic_ReconstructsPartitionTopic() + { + var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0); + + var result = PulsarListener.FixMessageId(messageId, TopicUri); + + result.Topic.ShouldBe($"{TopicUri}-partition-3"); + result.LedgerId.ShouldBe(1UL); + result.EntryId.ShouldBe(2UL); + result.Partition.ShouldBe(3); + result.BatchIndex.ShouldBe(0); + } + + [Fact] + public void FixMessageId_TopicAlreadySet_ReturnsOriginal() + { + var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0, topic: $"{TopicUri}-partition-3"); + + var result = PulsarListener.FixMessageId(messageId, TopicUri); + + result.ShouldBeSameAs(messageId); + } + + [Fact] + public void FixMessageId_NonPartitionedMessage_ReturnsOriginal() + { + var messageId = new MessageId(1UL, 2UL, partition: -1, batchIndex: -1); + + var result = PulsarListener.FixMessageId(messageId, TopicUri); + + result.ShouldBeSameAs(messageId); + } + + [Fact] + public void FixMessageId_BatchMessageOnPartitionedRetryTopic_UsesRetryTopicForReconstruction() + { + // When the source consumer is the retry consumer, the listener passes that consumer's + // Topic - which is the retry topic URI, not the base topic URI. The reconstruction must + // therefore key the partition sub-topic off the retry topic so the ack lands on the + // correct sub-consumer inside DotPulsar's IConsumer._subConsumers map. Passing the base + // topic URI here (as the prior reconstruction did) would produce + // "events-partition-3" — a key the retry consumer's sub-consumers map does not contain — + // and re-trigger the same KeyNotFoundException the original bug reported on the main + // ack path. + var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0); + + var result = PulsarListener.FixMessageId(messageId, RetryTopicUri); + + result.Topic.ShouldBe($"{RetryTopicUri}-partition-3"); + result.Partition.ShouldBe(3); + } +} diff --git a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs index b1894ea99..8ae76fc80 100644 --- a/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs +++ b/src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs @@ -158,7 +158,7 @@ public ValueTask CompleteAsync(Envelope envelope) var consumer = e.IsFromRetryConsumer && _retryConsumer != null ? _retryConsumer : _consumer; if (consumer != null) { - return consumer.Acknowledge(e.MessageData, _cancellation); + return consumer.Acknowledge(FixMessageId(e.MessageData.MessageId, consumer.Topic), _cancellation); } } @@ -172,7 +172,7 @@ public async ValueTask DeferAsync(Envelope envelope) if (_enableRequeue && _sender is not null && envelope is PulsarEnvelope e) { var consumer = e.IsFromRetryConsumer && _retryConsumer != null ? _retryConsumer : _consumer; - await consumer!.Acknowledge(e.MessageData, _cancellation); + await consumer!.Acknowledge(FixMessageId(e.MessageData.MessageId, consumer.Topic), _cancellation); await _sender.SendAsync(envelope); } } @@ -297,7 +297,7 @@ private async Task moveToQueueAsync(Envelope envelope, Exception? exception, boo } // Acknowledge the original message - await sourceConsumer!.Acknowledge(e.MessageData, _cancellation); + await sourceConsumer!.Acknowledge(FixMessageId(e.MessageData.MessageId, sourceConsumer.Topic), _cancellation); // Send copy to retry/DLQ topic await targetProducer.Send(messageMetadata, e.MessageData.Data, _cancellation) @@ -305,6 +305,29 @@ await targetProducer.Send(messageMetadata, e.MessageData.Data, _cancellation) } } + /// + /// Workaround for https://github.com/apache/pulsar-dotpulsar/issues/287. DotPulsar's + /// BatchHandler.Add constructs the inner for each message of a + /// batch via the four-arg ctor and never sets , so + /// Consumer.Acknowledge hits _subConsumers[messageId.Topic] with the empty + /// string and throws on + /// partitioned topics. Reconstruct the with the partition sub-topic + /// URI built from the the source consumer was subscribed to. + /// Using consumer.Topic rather than _endpoint.PulsarTopic() means the same + /// helper works for the retry consumer (which subscribes to {baseTopic}-RETRY) without + /// any branching — DotPulsar reports the actual subscribed topic per consumer. + /// + internal static MessageId FixMessageId(MessageId messageId, string consumerTopic) + { + if (string.IsNullOrEmpty(messageId.Topic) && messageId.Partition >= 0) + { + return new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, + messageId.BatchIndex, $"{consumerTopic}-partition-{messageId.Partition}"); + } + + return messageId; + } + private MessageMetadata BuildMessageMetadata(Envelope envelope, PulsarEnvelope e, Exception? exception, bool isDeadLettered) {