Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Prevent retry topic and dead letter topic producer leaks when sending of message fails #23824

Merged
merged 9 commits into from
Jan 9, 2025
Next Next commit
[fix][client] Prevent leaking producers when dead letter topic or ret…
…ry topic message sending fails
lhotari committed Jan 8, 2025
commit 11b0708c403e0ff34328be9b0403cedd223f5c81
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -40,9 +41,11 @@
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1167,4 +1170,94 @@ public void testDeadLetterPolicyDeserialize() throws Exception {
consumerBuilder.loadConf(config);
assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(), policy);
}

@Data
static class Payload {
String number;

public Payload() {

}

public Payload(String number) {
this.number = number;
}
}

@Data
static class PayloadIncompatible {
long number;

public PayloadIncompatible() {

}

public PayloadIncompatible(long number) {
this.number = number;
}
}

// reproduce issue reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
@Test
public void testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(PayloadIncompatible.class);
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
String dlqTopic = topic + "-DLQ";

// create topics
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(dlqTopic);

AtomicInteger nackCounter = new AtomicInteger(0);
Consumer<Payload> payloadConsumer = null;
try {
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
.ackTimeout(1, TimeUnit.SECONDS)
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build())
.messageListener((c, msg) -> {
if (nackCounter.incrementAndGet() < 10) {
c.negativeAcknowledge(msg);
}
}).subscribe();

// send a message to the topic with the incompatible schema
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
.create()) {
producer.send(payloadIncompatible);
}

Thread.sleep(2000L);

assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be <= 1 so that it doesn't leak producers",
dlqTopic)
.isLessThanOrEqualTo(1);

} finally {
if (payloadConsumer != null) {
try {
payloadConsumer.close();
} catch (PulsarClientException e) {
// ignore
}
}
}

assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be 0 here",
dlqTopic)
.isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
@@ -36,10 +37,12 @@
import lombok.Data;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.reflections.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -781,4 +784,100 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
consumer.close();
}

@Data
static class Payload {
String number;

public Payload() {

}

public Payload(String number) {
this.number = number;
}
}

@Data
static class PayloadIncompatible {
long number;

public PayloadIncompatible() {

}

public PayloadIncompatible(long number) {
this.number = number;
}
}

// reproduce similar issue as reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
// but for retry topic
@Test
public void testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
PayloadIncompatible.class);
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
String dlqTopic = topic + "-DLQ";
String retryTopic = topic + "-RETRY";

// create topics
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(dlqTopic);
admin.topics().createNonPartitionedTopic(retryTopic);

Consumer<Payload> payloadConsumer = null;
try {
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
.ackTimeout(1, TimeUnit.SECONDS)
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3)
.deadLetterTopic(dlqTopic).build())
.messageListener((c, msg) -> {
try {
c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}).subscribe();

// send a message to the topic with the incompatible schema
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
.create()) {
producer.send(payloadIncompatible);
}

Thread.sleep(2000L);

assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be <= 1 so that it doesn't leak producers",
retryTopic)
.isLessThanOrEqualTo(1);

} finally {
if (payloadConsumer != null) {
try {
payloadConsumer.close();
} catch (PulsarClientException e) {
// ignore
}
}
}

assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be 0 here",
retryTopic)
.isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
@@ -728,8 +728,8 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
return null;
});
}, internalPinnedExecutor).exceptionally(ex -> {
closeDeadLetterProducer();
result.completeExceptionally(ex);
deadLetterProducer = null;
return null;
});
} else {
@@ -751,8 +751,8 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
return null;
});
}, internalPinnedExecutor).exceptionally(ex -> {
closeRetryLetterProducer();
result.completeExceptionally(ex);
retryLetterProducer = null;
return null;
});
}
@@ -2256,7 +2256,7 @@ private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId)
}
}, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex);
deadLetterProducer = null;
closeDeadLetterProducer();
result.complete(false);
return null;
});
@@ -2291,6 +2291,25 @@ private void initDeadLetterProducerIfNeeded() {
}
}

private void closeDeadLetterProducer() {
createProducerLock.writeLock().lock();
try {
CompletableFuture<Producer<byte[]>> previousDeadLetterProducer = deadLetterProducer;
deadLetterProducer = null;
previousDeadLetterProducer.whenComplete((producer, throwable) -> {
if (producer != null) {
producer.closeAsync().whenComplete((v, t) -> {
if (t != null) {
log.error("Failed to close dead letter producer", t);
}
});
}
});
} finally {
createProducerLock.writeLock().unlock();
}
}

private void initRetryLetterProducerIfNeeded() {
if (retryLetterProducer == null) {
createProducerLock.writeLock().lock();
@@ -2313,6 +2332,25 @@ private void initRetryLetterProducerIfNeeded() {
}
}

private void closeRetryLetterProducer() {
createProducerLock.writeLock().lock();
try {
CompletableFuture<Producer<byte[]>> previousRetryLetterProducer = retryLetterProducer;
retryLetterProducer = null;
previousRetryLetterProducer.whenComplete((producer, throwable) -> {
if (producer != null) {
producer.closeAsync().whenComplete((v, t) -> {
if (t != null) {
log.error("Failed to close retry letter producer", t);
}
});
}
});
} finally {
createProducerLock.writeLock().unlock();
}
}

@Override
public void seek(MessageId messageId) throws PulsarClientException {
try {