Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions src/Transports/Pulsar/Wolverine.Pulsar.Tests/FixMessageIdTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
using DotPulsar;
using Shouldly;
using Xunit;

namespace Wolverine.Pulsar.Tests;

// Regression for the DotPulsar batch-handler partitioned-topic bug
// (https://github.com/apache/pulsar-dotpulsar/issues/287). Builds on @patrick-cloke-simplisafe's
// original repro in PR #2883; the only structural difference here is that PulsarListener now
// reconstructs the partition sub-topic URI from the source consumer's own Topic
// (IConsumer.Topic) rather than from _endpoint.PulsarTopic(). That additionally covers the
// retry-consumer ack path - the retry consumer is subscribed to {baseTopic}-RETRY (or a
// user-configured retry topic), so reusing the listener's base endpoint URI would have
// reconstructed wrong sub-topic names and produced the same KeyNotFoundException on partitioned
// retry topics.
public class FixMessageIdTests
{
private const string TopicUri = "persistent://public/default/events";
private const string RetryTopicUri = "persistent://public/default/events-RETRY";

[Fact]
public void FixMessageId_BatchMessageMissingTopic_ReconstructsPartitionTopic()
{
var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0);

var result = PulsarListener.FixMessageId(messageId, TopicUri);

result.Topic.ShouldBe($"{TopicUri}-partition-3");
result.LedgerId.ShouldBe(1UL);
result.EntryId.ShouldBe(2UL);
result.Partition.ShouldBe(3);
result.BatchIndex.ShouldBe(0);
}

[Fact]
public void FixMessageId_TopicAlreadySet_ReturnsOriginal()
{
var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0, topic: $"{TopicUri}-partition-3");

var result = PulsarListener.FixMessageId(messageId, TopicUri);

result.ShouldBeSameAs(messageId);
}

[Fact]
public void FixMessageId_NonPartitionedMessage_ReturnsOriginal()
{
var messageId = new MessageId(1UL, 2UL, partition: -1, batchIndex: -1);

var result = PulsarListener.FixMessageId(messageId, TopicUri);

result.ShouldBeSameAs(messageId);
}

[Fact]
public void FixMessageId_BatchMessageOnPartitionedRetryTopic_UsesRetryTopicForReconstruction()
{
// When the source consumer is the retry consumer, the listener passes that consumer's
// Topic - which is the retry topic URI, not the base topic URI. The reconstruction must
// therefore key the partition sub-topic off the retry topic so the ack lands on the
// correct sub-consumer inside DotPulsar's IConsumer._subConsumers map. Passing the base
// topic URI here (as the prior reconstruction did) would produce
// "events-partition-3" — a key the retry consumer's sub-consumers map does not contain —
// and re-trigger the same KeyNotFoundException the original bug reported on the main
// ack path.
var messageId = new MessageId(1UL, 2UL, partition: 3, batchIndex: 0);

var result = PulsarListener.FixMessageId(messageId, RetryTopicUri);

result.Topic.ShouldBe($"{RetryTopicUri}-partition-3");
result.Partition.ShouldBe(3);
}
}
29 changes: 26 additions & 3 deletions src/Transports/Pulsar/Wolverine.Pulsar/PulsarListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public ValueTask CompleteAsync(Envelope envelope)
var consumer = e.IsFromRetryConsumer && _retryConsumer != null ? _retryConsumer : _consumer;
if (consumer != null)
{
return consumer.Acknowledge(e.MessageData, _cancellation);
return consumer.Acknowledge(FixMessageId(e.MessageData.MessageId, consumer.Topic), _cancellation);
}
}

Expand All @@ -172,7 +172,7 @@ public async ValueTask DeferAsync(Envelope envelope)
if (_enableRequeue && _sender is not null && envelope is PulsarEnvelope e)
{
var consumer = e.IsFromRetryConsumer && _retryConsumer != null ? _retryConsumer : _consumer;
await consumer!.Acknowledge(e.MessageData, _cancellation);
await consumer!.Acknowledge(FixMessageId(e.MessageData.MessageId, consumer.Topic), _cancellation);
await _sender.SendAsync(envelope);
}
}
Expand Down Expand Up @@ -297,14 +297,37 @@ private async Task moveToQueueAsync(Envelope envelope, Exception? exception, boo
}

// Acknowledge the original message
await sourceConsumer!.Acknowledge(e.MessageData, _cancellation);
await sourceConsumer!.Acknowledge(FixMessageId(e.MessageData.MessageId, sourceConsumer.Topic), _cancellation);

// Send copy to retry/DLQ topic
await targetProducer.Send(messageMetadata, e.MessageData.Data, _cancellation)
.ConfigureAwait(false);
}
}

/// <summary>
/// Workaround for https://github.com/apache/pulsar-dotpulsar/issues/287. DotPulsar's
/// <c>BatchHandler.Add</c> constructs the inner <see cref="MessageId" /> for each message of a
/// batch via the four-arg ctor and never sets <see cref="MessageId.Topic" />, so
/// <c>Consumer.Acknowledge</c> hits <c>_subConsumers[messageId.Topic]</c> with the empty
/// string and throws <see cref="System.Collections.Generic.KeyNotFoundException" /> on
/// partitioned topics. Reconstruct the <see cref="MessageId" /> with the partition sub-topic
/// URI built from the <see cref="IConsumer.Topic" /> the source consumer was subscribed to.
/// Using <c>consumer.Topic</c> rather than <c>_endpoint.PulsarTopic()</c> means the same
/// helper works for the retry consumer (which subscribes to <c>{baseTopic}-RETRY</c>) without
/// any branching — DotPulsar reports the actual subscribed topic per consumer.
/// </summary>
internal static MessageId FixMessageId(MessageId messageId, string consumerTopic)
{
if (string.IsNullOrEmpty(messageId.Topic) && messageId.Partition >= 0)
{
return new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition,
messageId.BatchIndex, $"{consumerTopic}-partition-{messageId.Partition}");
}

return messageId;
}

private MessageMetadata BuildMessageMetadata(Envelope envelope, PulsarEnvelope e, Exception? exception,
bool isDeadLettered)
{
Expand Down