Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Prevent retry topic and dead letter topic producer leaks when sending of message fails #23824

Merged
merged 9 commits into from
Jan 9, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand All @@ -40,9 +41,11 @@
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1167,4 +1170,94 @@ public void testDeadLetterPolicyDeserialize() throws Exception {
consumerBuilder.loadConf(config);
assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(), policy);
}

@Data
static class Payload {
String number;

public Payload() {

}

public Payload(String number) {
this.number = number;
}
}

@Data
static class PayloadIncompatible {
long number;

public PayloadIncompatible() {

}

public PayloadIncompatible(long number) {
this.number = number;
}
}

// reproduce issue reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
@Test
public void testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(PayloadIncompatible.class);
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
String dlqTopic = topic + "-DLQ";

// create topics
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(dlqTopic);

AtomicInteger nackCounter = new AtomicInteger(0);
Consumer<Payload> payloadConsumer = null;
try {
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
.ackTimeout(1, TimeUnit.SECONDS)
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build())
.messageListener((c, msg) -> {
if (nackCounter.incrementAndGet() < 10) {
c.negativeAcknowledge(msg);
}
}).subscribe();

// send a message to the topic with the incompatible schema
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
.create()) {
producer.send(payloadIncompatible);
}

Thread.sleep(2000L);

assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be <= 1 so that it doesn't leak producers",
dlqTopic)
.isLessThanOrEqualTo(1);

} finally {
if (payloadConsumer != null) {
try {
payloadConsumer.close();
} catch (PulsarClientException e) {
// ignore
}
}
}

assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be 0 here",
dlqTopic)
.isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
Expand All @@ -36,10 +37,12 @@
import lombok.Data;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.reflections.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -781,4 +784,100 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
consumer.close();
}

@Data
static class Payload {
String number;

public Payload() {

}

public Payload(String number) {
this.number = number;
}
}

@Data
static class PayloadIncompatible {
long number;

public PayloadIncompatible() {

}

public PayloadIncompatible(long number) {
this.number = number;
}
}

// reproduce similar issue as reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
// but for retry topic
@Test
public void testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
PayloadIncompatible.class);
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
String dlqTopic = topic + "-DLQ";
String retryTopic = topic + "-RETRY";

// create topics
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(dlqTopic);
admin.topics().createNonPartitionedTopic(retryTopic);

Consumer<Payload> payloadConsumer = null;
try {
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
.ackTimeout(1, TimeUnit.SECONDS)
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3)
.deadLetterTopic(dlqTopic).build())
.messageListener((c, msg) -> {
try {
c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}).subscribe();

// send a message to the topic with the incompatible schema
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
.create()) {
producer.send(payloadIncompatible);
}

Thread.sleep(2000L);

assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be <= 1 so that it doesn't leak producers",
retryTopic)
.isLessThanOrEqualTo(1);

} finally {
if (payloadConsumer != null) {
try {
payloadConsumer.close();
} catch (PulsarClientException e) {
// ignore
}
}
}

assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be 0 here",
retryTopic)
.isEqualTo(0);
}
}
Loading
Loading