Skip to content

Commit 3841aec

Browse files
hanmzsrinath-ctds
authored andcommitted
[fix][client] Fix race-condition causing doReconsumeLater to hang when creating retryLetterProducer has failed (apache#23560)
(cherry picked from commit bf1f677) (cherry picked from commit 5f25367)
1 parent 86be34a commit 3841aec

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java

+68
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Map;
3131
import java.util.Set;
3232
import java.util.UUID;
33+
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.TimeUnit;
3435
import lombok.Cleanup;
3536
import lombok.Data;
@@ -45,6 +46,7 @@
4546
import org.testng.annotations.AfterMethod;
4647
import org.testng.annotations.BeforeMethod;
4748
import org.testng.annotations.Test;
49+
import org.testng.collections.Lists;
4850

4951
@Test(groups = "broker-api")
5052
public class RetryTopicTest extends ProducerConsumerBase {
@@ -713,4 +715,70 @@ public void testRetryProducerWillCloseByConsumer() throws Exception {
713715
admin.topics().delete(topicDLQ, false);
714716
}
715717

718+
719+
@Test(timeOut = 30000L)
720+
public void testRetryTopicExceptionWithConcurrent() throws Exception {
721+
final String topic = "persistent://my-property/my-ns/retry-topic";
722+
final int maxRedeliveryCount = 2;
723+
final int sendMessages = 10;
724+
// subscribe before publish
725+
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
726+
.topic(topic)
727+
.subscriptionName("my-subscription")
728+
.subscriptionType(SubscriptionType.Shared)
729+
.enableRetry(true)
730+
.receiverQueueSize(100)
731+
.deadLetterPolicy(DeadLetterPolicy.builder()
732+
.maxRedeliverCount(maxRedeliveryCount)
733+
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
734+
.build())
735+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
736+
.subscribe();
737+
738+
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
739+
.topic(topic)
740+
.create();
741+
for (int i = 0; i < sendMessages; i++) {
742+
producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send();
743+
}
744+
producer.close();
745+
746+
// mock a retry producer exception when reconsumelater is called
747+
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
748+
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
749+
for (ConsumerImpl<byte[]> c : consumers) {
750+
Set<Field> deadLetterPolicyField =
751+
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));
752+
753+
if (deadLetterPolicyField.size() != 0) {
754+
Field field = deadLetterPolicyField.iterator().next();
755+
field.setAccessible(true);
756+
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
757+
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
758+
}
759+
}
760+
761+
List<Message<byte[]>> messages = Lists.newArrayList();
762+
for (int i = 0; i < sendMessages; i++) {
763+
messages.add(consumer.receive());
764+
}
765+
766+
// mock call the reconsumeLater method concurrently
767+
CountDownLatch latch = new CountDownLatch(messages.size());
768+
for (Message<byte[]> message : messages) {
769+
new Thread(() -> {
770+
try {
771+
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
772+
} catch (Exception ignore) {
773+
774+
} finally {
775+
latch.countDown();
776+
}
777+
}).start();
778+
}
779+
780+
latch.await();
781+
consumer.close();
782+
}
783+
716784
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

+2
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,8 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
708708
} catch (Exception e) {
709709
result.completeExceptionally(e);
710710
}
711+
} else {
712+
result.completeExceptionally(new PulsarClientException("Retry letter producer is null."));
711713
}
712714
MessageId finalMessageId = messageId;
713715
result.exceptionally(ex -> {

0 commit comments

Comments
 (0)