Skip to content

Commit

Permalink
[fix][client] Prevent retry topic and dead letter topic producer leak…
Browse files Browse the repository at this point in the history
…s when sending of message fails (#23824)

(cherry picked from commit 04e89fe)
  • Loading branch information
lhotari committed Jan 9, 2025
1 parent af97eaa commit bb11a99
Show file tree
Hide file tree
Showing 5 changed files with 423 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand All @@ -40,9 +41,11 @@
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1167,4 +1170,94 @@ public void testDeadLetterPolicyDeserialize() throws Exception {
consumerBuilder.loadConf(config);
assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(), policy);
}

@Data
static class Payload {
String number;

public Payload() {

}

public Payload(String number) {
this.number = number;
}
}

@Data
static class PayloadIncompatible {
long number;

public PayloadIncompatible() {

}

public PayloadIncompatible(long number) {
this.number = number;
}
}

// reproduce issue reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
@Test
public void testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(PayloadIncompatible.class);
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
String dlqTopic = topic + "-DLQ";

// create topics
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(dlqTopic);

AtomicInteger nackCounter = new AtomicInteger(0);
Consumer<Payload> payloadConsumer = null;
try {
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
.ackTimeout(1, TimeUnit.SECONDS)
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build())
.messageListener((c, msg) -> {
if (nackCounter.incrementAndGet() < 10) {
c.negativeAcknowledge(msg);
}
}).subscribe();

// send a message to the topic with the incompatible schema
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
.create()) {
producer.send(payloadIncompatible);
}

Thread.sleep(2000L);

assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be <= 1 so that it doesn't leak producers",
dlqTopic)
.isLessThanOrEqualTo(1);

} finally {
if (payloadConsumer != null) {
try {
payloadConsumer.close();
} catch (PulsarClientException e) {
// ignore
}
}
}

assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be 0 here",
dlqTopic)
.isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -36,11 +36,10 @@
import lombok.Data;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.reflections.ReflectionUtils;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -617,10 +616,12 @@ public void testRetryTopicByCustomTopicName() throws Exception {

@Test(timeOut = 30000L)
public void testRetryTopicException() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
final int maxRedeliveryCount = 2;
final int sendMessages = 1;
// subscribe before publish
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
Expand All @@ -629,7 +630,7 @@ public void testRetryTopicException() throws Exception {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.retryLetterTopic(retryLetterTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Expand All @@ -642,30 +643,16 @@ public void testRetryTopicException() throws Exception {
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
}
}
admin.topics().terminateTopic(retryLetterTopic);

Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (PulsarClientException.InvalidTopicNameException e) {
assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class);
} catch (Exception e) {
fail("exception should be PulsarClientException.InvalidTopicNameException");
fail("exception should be PulsarClientException.TopicTerminatedException");
} catch (PulsarClientException.TopicTerminatedException e) {
// ok
}
consumer.close();
}


Expand Down Expand Up @@ -718,10 +705,12 @@ public void testRetryProducerWillCloseByConsumer() throws Exception {

@Test(timeOut = 30000L)
public void testRetryTopicExceptionWithConcurrent() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
final int maxRedeliveryCount = 2;
final int sendMessages = 10;
// subscribe before publish
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
Expand All @@ -730,7 +719,7 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.retryLetterTopic(retryLetterTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Expand All @@ -739,24 +728,11 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send();
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
}
}
admin.topics().terminateTopic(retryLetterTopic);

List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < sendMessages; i++) {
Expand All @@ -769,16 +745,114 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
new Thread(() -> {
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (Exception ignore) {

} finally {
} catch (PulsarClientException.TopicTerminatedException e) {
// ok
latch.countDown();
} catch (PulsarClientException e) {
// unexpected exception
fail("unexpected exception", e);
}
}).start();
}

latch.await();
latch.await(sendMessages, TimeUnit.SECONDS);
consumer.close();
}

@Data
static class Payload {
String number;

public Payload() {

}

public Payload(String number) {
this.number = number;
}
}

@Data
static class PayloadIncompatible {
long number;

public PayloadIncompatible() {

}

public PayloadIncompatible(long number) {
this.number = number;
}
}

// reproduce similar issue as reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
// but for retry topic
@Test
public void testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
PayloadIncompatible.class);
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
String dlqTopic = topic + "-DLQ";
String retryTopic = topic + "-RETRY";

// create topics
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(dlqTopic);
admin.topics().createNonPartitionedTopic(retryTopic);

Consumer<Payload> payloadConsumer = null;
try {
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
.ackTimeout(1, TimeUnit.SECONDS)
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3)
.deadLetterTopic(dlqTopic).build())
.messageListener((c, msg) -> {
try {
c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}).subscribe();

// send a message to the topic with the incompatible schema
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
.create()) {
producer.send(payloadIncompatible);
}

Thread.sleep(2000L);

assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be <= 1 so that it doesn't leak producers",
retryTopic)
.isLessThanOrEqualTo(1);

} finally {
if (payloadConsumer != null) {
try {
payloadConsumer.close();
} catch (PulsarClientException e) {
// ignore
}
}
}

assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be 0 here",
retryTopic)
.isEqualTo(0);
}
}
Loading

0 comments on commit bb11a99

Please sign in to comment.