From 11b0708c403e0ff34328be9b0403cedd223f5c81 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 8 Jan 2025 14:00:42 +0200 Subject: [PATCH 1/8] [fix][client] Prevent leaking producers when dead letter topic or retry topic message sending fails --- .../client/api/DeadLetterTopicTest.java | 93 +++++++++++++++++ .../pulsar/client/api/RetryTopicTest.java | 99 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 44 ++++++++- 3 files changed, 233 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index e46fddeacc117..ab26949c04fc6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; @@ -40,9 +41,11 @@ import lombok.Cleanup; import lombok.Data; import org.apache.avro.reflect.Nullable; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.client.util.RetryMessageUtil; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1167,4 +1170,94 @@ public void testDeadLetterPolicyDeserialize() throws Exception { consumerBuilder.loadConf(config); assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(), policy); } + + @Data + static class Payload { + String number; + + public Payload() { + + } + + public Payload(String number) { + this.number = number; + } + } + + @Data + static class PayloadIncompatible { + long number; + + public PayloadIncompatible() { + + } + + public PayloadIncompatible(long number) { + this.number = number; + } + } + + // reproduce issue reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321 + @Test + public void testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns"); + admin.namespaces().createNamespace(namespace); + // don't enforce schema validation + admin.namespaces().setSchemaValidationEnforced(namespace, false); + // set schema compatibility strategy to always compatible + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + + Schema schema = Schema.AVRO(Payload.class); + Schema schemaIncompatible = Schema.AVRO(PayloadIncompatible.class); + String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + + "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak"); + String dlqTopic = topic + "-DLQ"; + + // create topics + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createNonPartitionedTopic(dlqTopic); + + AtomicInteger nackCounter = new AtomicInteger(0); + Consumer payloadConsumer = null; + try { + payloadConsumer = pulsarClient.newConsumer(schema).topic(topic) + .subscriptionType(SubscriptionType.Shared).subscriptionName("sub") + .ackTimeout(1, TimeUnit.SECONDS) + .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS) + .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build()) + .messageListener((c, msg) -> { + if (nackCounter.incrementAndGet() < 10) { + c.negativeAcknowledge(msg); + } + }).subscribe(); + + // send a message to the topic with the incompatible schema + PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123); + try (Producer producer = pulsarClient.newProducer(schemaIncompatible).topic(topic) + .create()) { + producer.send(payloadIncompatible); + } + + Thread.sleep(2000L); + + assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size()) + .describedAs("producer count of dlq topic %s should be <= 1 so that it doesn't leak producers", + dlqTopic) + .isLessThanOrEqualTo(1); + + } finally { + if (payloadConsumer != null) { + try { + payloadConsumer.close(); + } catch (PulsarClientException e) { + // ignore + } + } + } + + assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size()) + .describedAs("producer count of dlq topic %s should be 0 here", + dlqTopic) + .isEqualTo(0); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index cd598585c8e87..9f08819374582 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.api; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNull; @@ -36,10 +37,12 @@ import lombok.Data; import org.apache.avro.AvroRuntimeException; import org.apache.avro.reflect.Nullable; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.util.RetryMessageUtil; +import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.reflections.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -781,4 +784,100 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { consumer.close(); } + @Data + static class Payload { + String number; + + public Payload() { + + } + + public Payload(String number) { + this.number = number; + } + } + + @Data + static class PayloadIncompatible { + long number; + + public PayloadIncompatible() { + + } + + public PayloadIncompatible(long number) { + this.number = number; + } + } + + // reproduce similar issue as reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321 + // but for retry topic + @Test + public void testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception { + String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns"); + admin.namespaces().createNamespace(namespace); + // don't enforce schema validation + admin.namespaces().setSchemaValidationEnforced(namespace, false); + // set schema compatibility strategy to always compatible + admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); + + Schema schema = Schema.AVRO(Payload.class); + Schema schemaIncompatible = Schema.AVRO( + PayloadIncompatible.class); + String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace + + "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak"); + String dlqTopic = topic + "-DLQ"; + String retryTopic = topic + "-RETRY"; + + // create topics + admin.topics().createNonPartitionedTopic(topic); + admin.topics().createNonPartitionedTopic(dlqTopic); + admin.topics().createNonPartitionedTopic(retryTopic); + + Consumer payloadConsumer = null; + try { + payloadConsumer = pulsarClient.newConsumer(schema).topic(topic) + .subscriptionType(SubscriptionType.Shared).subscriptionName("sub") + .ackTimeout(1, TimeUnit.SECONDS) + .negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS) + .enableRetry(true) + .deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3) + .deadLetterTopic(dlqTopic).build()) + .messageListener((c, msg) -> { + try { + c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }).subscribe(); + + // send a message to the topic with the incompatible schema + PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123); + try (Producer producer = pulsarClient.newProducer(schemaIncompatible).topic(topic) + .create()) { + producer.send(payloadIncompatible); + } + + Thread.sleep(2000L); + + assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size()) + .describedAs("producer count of retry topic %s should be <= 1 so that it doesn't leak producers", + retryTopic) + .isLessThanOrEqualTo(1); + + } finally { + if (payloadConsumer != null) { + try { + payloadConsumer.close(); + } catch (PulsarClientException e) { + // ignore + } + } + } + + assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size()) + .describedAs("producer count of retry topic %s should be 0 here", + retryTopic) + .isEqualTo(0); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 86af4bdaf58c8..b263ec65f0328 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -728,8 +728,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a return null; }); }, internalPinnedExecutor).exceptionally(ex -> { + closeDeadLetterProducer(); result.completeExceptionally(ex); - deadLetterProducer = null; return null; }); } else { @@ -751,8 +751,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a return null; }); }, internalPinnedExecutor).exceptionally(ex -> { + closeRetryLetterProducer(); result.completeExceptionally(ex); - retryLetterProducer = null; return null; }); } @@ -2256,7 +2256,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) } }, internalPinnedExecutor).exceptionally(ex -> { log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex); - deadLetterProducer = null; + closeDeadLetterProducer(); result.complete(false); return null; }); @@ -2291,6 +2291,25 @@ private void initDeadLetterProducerIfNeeded() { } } + private void closeDeadLetterProducer() { + createProducerLock.writeLock().lock(); + try { + CompletableFuture> previousDeadLetterProducer = deadLetterProducer; + deadLetterProducer = null; + previousDeadLetterProducer.whenComplete((producer, throwable) -> { + if (producer != null) { + producer.closeAsync().whenComplete((v, t) -> { + if (t != null) { + log.error("Failed to close dead letter producer", t); + } + }); + } + }); + } finally { + createProducerLock.writeLock().unlock(); + } + } + private void initRetryLetterProducerIfNeeded() { if (retryLetterProducer == null) { createProducerLock.writeLock().lock(); @@ -2313,6 +2332,25 @@ private void initRetryLetterProducerIfNeeded() { } } + private void closeRetryLetterProducer() { + createProducerLock.writeLock().lock(); + try { + CompletableFuture> previousRetryLetterProducer = retryLetterProducer; + retryLetterProducer = null; + previousRetryLetterProducer.whenComplete((producer, throwable) -> { + if (producer != null) { + producer.closeAsync().whenComplete((v, t) -> { + if (t != null) { + log.error("Failed to close retry letter producer", t); + } + }); + } + }); + } finally { + createProducerLock.writeLock().unlock(); + } + } + @Override public void seek(MessageId messageId) throws PulsarClientException { try { From 03f8b4321fe6d091fcb707a073219c48155d5add Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 8 Jan 2025 14:48:37 +0200 Subject: [PATCH 2/8] Implement backoff for creating dead letter topic and retry letter topic producers --- .../pulsar/client/impl/ConsumerImpl.java | 106 +++++++++++++----- 1 file changed, 76 insertions(+), 30 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b263ec65f0328..eee0bc67e75af 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -67,6 +67,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.Getter; @@ -202,8 +203,9 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final DeadLetterPolicy deadLetterPolicy; private volatile CompletableFuture> deadLetterProducer; - + private volatile int deadLetterProducerFailureCount; private volatile CompletableFuture> retryLetterProducer; + private volatile int retryLetterProducerFailureCount; private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); protected volatile boolean paused; @@ -715,6 +717,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a .properties(propertiesMap); copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { + deadLetterProducerFailureCount = 0; consumerDlqMessagesCounter.increment(); doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { @@ -728,7 +731,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a return null; }); }, internalPinnedExecutor).exceptionally(ex -> { - closeDeadLetterProducer(); + closeDeadLetterProducerAfterException(); result.completeExceptionally(ex); return null; }); @@ -745,13 +748,16 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) - .thenAccept(v -> result.complete(null)) + .thenAccept(v -> { + retryLetterProducerFailureCount = 0; + result.complete(null); + }) .exceptionally(ex -> { result.completeExceptionally(ex); return null; }); }, internalPinnedExecutor).exceptionally(ex -> { - closeRetryLetterProducer(); + closeRetryLetterProducerAfterException(); result.completeExceptionally(ex); return null; }); @@ -2229,6 +2235,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) copyMessageKeysIfNeeded(message, typedMessageBuilderNew); typedMessageBuilderNew.sendAsync() .thenAccept(messageIdInDLQ -> { + deadLetterProducerFailureCount = 0; possibleSendToDeadLetterTopicMessages.remove(messageId); acknowledgeAsync(messageId).whenComplete((v, ex) -> { if (ex != null) { @@ -2256,7 +2263,7 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) } }, internalPinnedExecutor).exceptionally(ex -> { log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex); - closeDeadLetterProducer(); + closeDeadLetterProducerAfterException(); result.complete(false); return null; }); @@ -2271,19 +2278,24 @@ private void initDeadLetterProducerIfNeeded() { createProducerLock.writeLock().lock(); try { if (deadLetterProducer == null) { - deadLetterProducer = - ((ProducerBuilderImpl) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) - .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) - .topic(this.deadLetterPolicy.getDeadLetterTopic()) - .producerName(String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription, - this.consumerName, RandomStringUtils.randomAlphanumeric(5))) - .blockIfQueueFull(false) - .enableBatching(false) - .enableChunking(true) - .createAsync(); - deadLetterProducer.thenAccept(dlqProducer -> { - stats.setDeadLetterProducerStats(dlqProducer.getStats()); - }); + deadLetterProducer = createProducerWithBackOff(() -> { + CompletableFuture> newProducer = + ((ProducerBuilderImpl) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) + .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) + .topic(this.deadLetterPolicy.getDeadLetterTopic()) + .producerName( + String.format("%s-%s-%s-%s-DLQ", this.topicName, this.subscription, + this.consumerName, RandomStringUtils.randomAlphanumeric(5))) + .blockIfQueueFull(false) + .enableBatching(false) + .enableChunking(true) + .createAsync(); + newProducer.thenAccept(dlqProducer -> { + stats.setDeadLetterProducerStats(dlqProducer.getStats()); + }); + return newProducer; + }, deadLetterProducerFailureCount, () -> "dead letter producer (topic: " + + deadLetterPolicy.getDeadLetterTopic() + ")"); } } finally { createProducerLock.writeLock().unlock(); @@ -2291,11 +2303,40 @@ private void initDeadLetterProducerIfNeeded() { } } - private void closeDeadLetterProducer() { + private CompletableFuture> createProducerWithBackOff( + Supplier>> producerSupplier, int failureCount, + Supplier logDescription) { + if (failureCount == 0) { + return producerSupplier.get(); + } else { + // calculate backoff time for given failure count + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.MILLISECONDS) + .setMandatoryStop(client.getConfiguration().getOperationTimeoutMs() * 2, + TimeUnit.MILLISECONDS) + .setMax(1, TimeUnit.MINUTES) + .create(); + long backoffTimeMillis = 0; + for (int i = 0; i < failureCount; i++) { + backoffTimeMillis = backoff.next(); + } + CompletableFuture> newProducer = new CompletableFuture<>(); + ScheduledExecutorService executor = + (ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor(this); + log.info("Creating {} with backoff time of {} ms", logDescription.get(), backoffTimeMillis); + executor.schedule(() -> { + FutureUtil.completeAfter(newProducer, producerSupplier.get()); + }, backoffTimeMillis, TimeUnit.MILLISECONDS); + return newProducer; + } + } + + private void closeDeadLetterProducerAfterException() { createProducerLock.writeLock().lock(); try { CompletableFuture> previousDeadLetterProducer = deadLetterProducer; deadLetterProducer = null; + deadLetterProducerFailureCount++; previousDeadLetterProducer.whenComplete((producer, throwable) -> { if (producer != null) { producer.closeAsync().whenComplete((v, t) -> { @@ -2315,16 +2356,20 @@ private void initRetryLetterProducerIfNeeded() { createProducerLock.writeLock().lock(); try { if (retryLetterProducer == null) { - retryLetterProducer = client - .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) - .topic(this.deadLetterPolicy.getRetryLetterTopic()) - .enableBatching(false) - .enableChunking(true) - .blockIfQueueFull(false) - .createAsync(); - retryLetterProducer.thenAccept(rtlProducer -> { - stats.setRetryLetterProducerStats(rtlProducer.getStats()); - }); + retryLetterProducer = createProducerWithBackOff(() -> { + CompletableFuture> newProducer = client + .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) + .topic(this.deadLetterPolicy.getRetryLetterTopic()) + .enableBatching(false) + .enableChunking(true) + .blockIfQueueFull(false) + .createAsync(); + newProducer.thenAccept(rtlProducer -> { + stats.setRetryLetterProducerStats(rtlProducer.getStats()); + }); + return newProducer; + }, retryLetterProducerFailureCount, () -> "retry letter producer (topic: " + + deadLetterPolicy.getRetryLetterTopic() + ")"); } } finally { createProducerLock.writeLock().unlock(); @@ -2332,11 +2377,12 @@ private void initRetryLetterProducerIfNeeded() { } } - private void closeRetryLetterProducer() { + private void closeRetryLetterProducerAfterException() { createProducerLock.writeLock().lock(); try { CompletableFuture> previousRetryLetterProducer = retryLetterProducer; retryLetterProducer = null; + retryLetterProducerFailureCount++; previousRetryLetterProducer.whenComplete((producer, throwable) -> { if (producer != null) { producer.closeAsync().whenComplete((v, t) -> { From eaf0eff6e7edca8e1a2a2a3b9cc794b3eae9087c Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 8 Jan 2025 15:52:21 +0200 Subject: [PATCH 3/8] Skip spotbugs error about VO_VOLATILE_INCREMENT --- pulsar-client/src/main/resources/findbugsExclude.xml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml index 0e05d20cb9bb4..e41dd4eca58d6 100644 --- a/pulsar-client/src/main/resources/findbugsExclude.xml +++ b/pulsar-client/src/main/resources/findbugsExclude.xml @@ -1043,4 +1043,14 @@ + + + + + + + + + + From 481f654c76de951f2d676bfd3944e5f73a8a3fbb Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 8 Jan 2025 16:04:34 +0200 Subject: [PATCH 4/8] Add null check --- .../pulsar/client/impl/ConsumerImpl.java | 40 ++++++++++--------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index eee0bc67e75af..a5fb45ef67906 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2337,15 +2337,17 @@ private void closeDeadLetterProducerAfterException() { CompletableFuture> previousDeadLetterProducer = deadLetterProducer; deadLetterProducer = null; deadLetterProducerFailureCount++; - previousDeadLetterProducer.whenComplete((producer, throwable) -> { - if (producer != null) { - producer.closeAsync().whenComplete((v, t) -> { - if (t != null) { - log.error("Failed to close dead letter producer", t); - } - }); - } - }); + if (previousDeadLetterProducer != null) { + previousDeadLetterProducer.whenComplete((producer, throwable) -> { + if (producer != null) { + producer.closeAsync().whenComplete((v, t) -> { + if (t != null) { + log.error("Failed to close dead letter producer", t); + } + }); + } + }); + } } finally { createProducerLock.writeLock().unlock(); } @@ -2383,15 +2385,17 @@ private void closeRetryLetterProducerAfterException() { CompletableFuture> previousRetryLetterProducer = retryLetterProducer; retryLetterProducer = null; retryLetterProducerFailureCount++; - previousRetryLetterProducer.whenComplete((producer, throwable) -> { - if (producer != null) { - producer.closeAsync().whenComplete((v, t) -> { - if (t != null) { - log.error("Failed to close retry letter producer", t); - } - }); - } - }); + if (previousRetryLetterProducer != null) { + previousRetryLetterProducer.whenComplete((producer, throwable) -> { + if (producer != null) { + producer.closeAsync().whenComplete((v, t) -> { + if (t != null) { + log.error("Failed to close retry letter producer", t); + } + }); + } + }); + } } finally { createProducerLock.writeLock().unlock(); } From f55830a5a66ffbf617a2942471606ef10e8b73a9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 8 Jan 2025 16:13:14 +0200 Subject: [PATCH 5/8] Reduce code duplication --- .../pulsar/client/impl/ConsumerImpl.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a5fb45ef67906..14d1b9cae4590 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2337,22 +2337,26 @@ private void closeDeadLetterProducerAfterException() { CompletableFuture> previousDeadLetterProducer = deadLetterProducer; deadLetterProducer = null; deadLetterProducerFailureCount++; - if (previousDeadLetterProducer != null) { - previousDeadLetterProducer.whenComplete((producer, throwable) -> { - if (producer != null) { - producer.closeAsync().whenComplete((v, t) -> { - if (t != null) { - log.error("Failed to close dead letter producer", t); - } - }); - } - }); - } + closeProducerFuture(previousDeadLetterProducer); } finally { createProducerLock.writeLock().unlock(); } } + private static void closeProducerFuture(CompletableFuture> producerFuture) { + if (producerFuture != null) { + producerFuture.whenComplete((producer, throwable) -> { + if (producer != null) { + producer.closeAsync().whenComplete((v, t) -> { + if (t != null) { + log.error("Failed to close producer", t); + } + }); + } + }); + } + } + private void initRetryLetterProducerIfNeeded() { if (retryLetterProducer == null) { createProducerLock.writeLock().lock(); @@ -2385,17 +2389,7 @@ private void closeRetryLetterProducerAfterException() { CompletableFuture> previousRetryLetterProducer = retryLetterProducer; retryLetterProducer = null; retryLetterProducerFailureCount++; - if (previousRetryLetterProducer != null) { - previousRetryLetterProducer.whenComplete((producer, throwable) -> { - if (producer != null) { - producer.closeAsync().whenComplete((v, t) -> { - if (t != null) { - log.error("Failed to close retry letter producer", t); - } - }); - } - }); - } + closeProducerFuture(previousRetryLetterProducer); } finally { createProducerLock.writeLock().unlock(); } From 615dd41baa2f0c74b7460ab316eccf9c6189e24f Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 9 Jan 2025 11:39:31 +0200 Subject: [PATCH 6/8] Revisit logic based on review feedback --- .../pulsar/client/impl/ConsumerImpl.java | 242 +++++++++--------- 1 file changed, 119 insertions(+), 123 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 14d1b9cae4590..00b9b31d2a063 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -684,9 +684,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a return FutureUtil.failedFuture(exception); } - initRetryLetterProducerIfNeeded(); CompletableFuture result = new CompletableFuture<>(); - if (retryLetterProducer != null) { + if (initRetryLetterProducerIfNeeded() != null) { try { MessageImpl retryMessage = (MessageImpl) getMessageImpl(message); String originMessageIdStr = message.getMessageId().toString(); @@ -709,55 +708,60 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a MessageId finalMessageId = messageId; if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) { - initDeadLetterProducerIfNeeded(); - deadLetterProducer.thenAcceptAsync(dlqProducer -> { - TypedMessageBuilder typedMessageBuilderNew = - dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get())) - .value(retryMessage.getData()) - .properties(propertiesMap); - copyMessageKeysIfNeeded(message, typedMessageBuilderNew); - typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { - deadLetterProducerFailureCount = 0; - consumerDlqMessagesCounter.increment(); - - doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { - result.complete(null); + initDeadLetterProducerIfNeeded().thenAcceptAsync(dlqProducer -> { + try { + TypedMessageBuilder typedMessageBuilderNew = + dlqProducer.newMessage( + Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get())) + .value(retryMessage.getData()) + .properties(propertiesMap); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + typedMessageBuilderNew.sendAsync().thenAccept(msgId -> { + consumerDlqMessagesCounter.increment(); + + doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> { + result.complete(null); + }).exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); }).exceptionally(ex -> { result.completeExceptionally(ex); return null; }); - }).exceptionally(ex -> { - result.completeExceptionally(ex); - return null; - }); + } catch (Exception e) { + result.completeExceptionally(e); + } }, internalPinnedExecutor).exceptionally(ex -> { - closeDeadLetterProducerAfterException(); result.completeExceptionally(ex); return null; }); } else { assert retryMessage != null; - retryLetterProducer.thenAcceptAsync(rtlProducer -> { - TypedMessageBuilder typedMessageBuilderNew = rtlProducer - .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(retryMessage.getData()) - .properties(propertiesMap); - if (delayTime > 0) { - typedMessageBuilderNew.deliverAfter(delayTime, unit); + initRetryLetterProducerIfNeeded().thenAcceptAsync(rtlProducer -> { + try { + TypedMessageBuilder typedMessageBuilderNew = rtlProducer + .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + .value(retryMessage.getData()) + .properties(propertiesMap); + if (delayTime > 0) { + typedMessageBuilderNew.deliverAfter(delayTime, unit); + } + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + typedMessageBuilderNew.sendAsync() + .thenCompose( + __ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) + .thenAccept(v -> { + result.complete(null); + }) + .exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); + } catch (Exception e) { + result.completeExceptionally(e); } - copyMessageKeysIfNeeded(message, typedMessageBuilderNew); - typedMessageBuilderNew.sendAsync() - .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) - .thenAccept(v -> { - retryLetterProducerFailureCount = 0; - result.complete(null); - }) - .exceptionally(ex -> { - result.completeExceptionally(ex); - return null; - }); }, internalPinnedExecutor).exceptionally(ex -> { - closeRetryLetterProducerAfterException(); result.completeExceptionally(ex); return null; }); @@ -2222,48 +2226,54 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) } CompletableFuture result = new CompletableFuture<>(); if (deadLetterMessages != null) { - initDeadLetterProducerIfNeeded(); List> finalDeadLetterMessages = deadLetterMessages; - deadLetterProducer.thenAcceptAsync(producerDLQ -> { + initDeadLetterProducerIfNeeded().thenAcceptAsync(producerDLQ -> { for (MessageImpl message : finalDeadLetterMessages) { - String originMessageIdStr = message.getMessageId().toString(); - String originTopicNameStr = getOriginTopicNameStr(message); - TypedMessageBuilder typedMessageBuilderNew = - producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(message.getData()) - .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); - copyMessageKeysIfNeeded(message, typedMessageBuilderNew); - typedMessageBuilderNew.sendAsync() - .thenAccept(messageIdInDLQ -> { - deadLetterProducerFailureCount = 0; - possibleSendToDeadLetterTopicMessages.remove(messageId); - acknowledgeAsync(messageId).whenComplete((v, ex) -> { - if (ex != null) { - log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original" - + " topic but send to the DLQ successfully.", - topicName, subscription, consumerName, messageId, ex); - result.complete(false); + try { + String originMessageIdStr = message.getMessageId().toString(); + String originTopicNameStr = getOriginTopicNameStr(message); + TypedMessageBuilder typedMessageBuilderNew = + producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + .value(message.getData()) + .properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr)); + copyMessageKeysIfNeeded(message, typedMessageBuilderNew); + typedMessageBuilderNew.sendAsync() + .thenAccept(messageIdInDLQ -> { + possibleSendToDeadLetterTopicMessages.remove(messageId); + acknowledgeAsync(messageId).whenComplete((v, ex) -> { + if (ex != null) { + log.warn( + "[{}] [{}] [{}] Failed to acknowledge the message {} of the " + + "original topic but send to the DLQ successfully.", + topicName, subscription, consumerName, messageId, ex); + result.complete(false); + } else { + result.complete(true); + } + }); + }).exceptionally(ex -> { + if (ex instanceof PulsarClientException.ProducerQueueIsFullError) { + log.warn( + "[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}: {}", + topicName, subscription, consumerName, + deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage()); } else { - result.complete(true); + log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", + topicName, subscription, consumerName, + deadLetterPolicy.getDeadLetterTopic(), messageId, ex); } + result.complete(false); + return null; }); - }).exceptionally(ex -> { - if (ex instanceof PulsarClientException.ProducerQueueIsFullError) { - log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}: {}", - topicName, subscription, consumerName, - deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage()); - } else { - log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", - topicName, subscription, consumerName, - deadLetterPolicy.getDeadLetterTopic(), messageId, ex); - } - result.complete(false); - return null; - }); + } catch (Exception e) { + log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}", + topicName, subscription, consumerName, deadLetterPolicy.getDeadLetterTopic(), messageId, + e); + result.complete(false); + } } }, internalPinnedExecutor).exceptionally(ex -> { log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex); - closeDeadLetterProducerAfterException(); result.complete(false); return null; }); @@ -2273,12 +2283,14 @@ private CompletableFuture processPossibleToDLQ(MessageIdAdv messageId) return result; } - private void initDeadLetterProducerIfNeeded() { - if (deadLetterProducer == null) { + private CompletableFuture> initDeadLetterProducerIfNeeded() { + CompletableFuture> p = deadLetterProducer; + if (p == null || p.isCompletedExceptionally()) { createProducerLock.writeLock().lock(); try { - if (deadLetterProducer == null) { - deadLetterProducer = createProducerWithBackOff(() -> { + p = deadLetterProducer; + if (p == null || p.isCompletedExceptionally()) { + p = createProducerWithBackOff(() -> { CompletableFuture> newProducer = ((ProducerBuilderImpl) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))) .initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName()) @@ -2290,17 +2302,27 @@ private void initDeadLetterProducerIfNeeded() { .enableBatching(false) .enableChunking(true) .createAsync(); - newProducer.thenAccept(dlqProducer -> { - stats.setDeadLetterProducerStats(dlqProducer.getStats()); + newProducer.whenComplete((producer, ex) -> { + if (ex != null) { + log.error("[{}] [{}] [{}] Failed to create dead letter producer for topic {}", + topicName, subscription, consumerName, deadLetterPolicy.getDeadLetterTopic(), + ex); + deadLetterProducerFailureCount++; + } else { + deadLetterProducerFailureCount = 0; + stats.setDeadLetterProducerStats(producer.getStats()); + } }); return newProducer; }, deadLetterProducerFailureCount, () -> "dead letter producer (topic: " + deadLetterPolicy.getDeadLetterTopic() + ")"); + deadLetterProducer = p; } } finally { createProducerLock.writeLock().unlock(); } } + return p; } private CompletableFuture> createProducerWithBackOff( @@ -2331,38 +2353,14 @@ private CompletableFuture> createProducerWithBackOff( } } - private void closeDeadLetterProducerAfterException() { - createProducerLock.writeLock().lock(); - try { - CompletableFuture> previousDeadLetterProducer = deadLetterProducer; - deadLetterProducer = null; - deadLetterProducerFailureCount++; - closeProducerFuture(previousDeadLetterProducer); - } finally { - createProducerLock.writeLock().unlock(); - } - } - - private static void closeProducerFuture(CompletableFuture> producerFuture) { - if (producerFuture != null) { - producerFuture.whenComplete((producer, throwable) -> { - if (producer != null) { - producer.closeAsync().whenComplete((v, t) -> { - if (t != null) { - log.error("Failed to close producer", t); - } - }); - } - }); - } - } - - private void initRetryLetterProducerIfNeeded() { - if (retryLetterProducer == null) { + private CompletableFuture> initRetryLetterProducerIfNeeded() { + CompletableFuture> p = retryLetterProducer; + if (p == null || p.isCompletedExceptionally()) { createProducerLock.writeLock().lock(); try { - if (retryLetterProducer == null) { - retryLetterProducer = createProducerWithBackOff(() -> { + p = retryLetterProducer; + if (p == null || p.isCompletedExceptionally()) { + p = createProducerWithBackOff(() -> { CompletableFuture> newProducer = client .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) .topic(this.deadLetterPolicy.getRetryLetterTopic()) @@ -2370,29 +2368,27 @@ private void initRetryLetterProducerIfNeeded() { .enableChunking(true) .blockIfQueueFull(false) .createAsync(); - newProducer.thenAccept(rtlProducer -> { - stats.setRetryLetterProducerStats(rtlProducer.getStats()); + newProducer.whenComplete((producer, ex) -> { + if (ex != null) { + log.error("[{}] [{}] [{}] Failed to create retry letter producer for topic {}", + topicName, subscription, consumerName, deadLetterPolicy.getRetryLetterTopic(), + ex); + retryLetterProducerFailureCount++; + } else { + retryLetterProducerFailureCount = 0; + stats.setRetryLetterProducerStats(producer.getStats()); + } }); return newProducer; }, retryLetterProducerFailureCount, () -> "retry letter producer (topic: " + deadLetterPolicy.getRetryLetterTopic() + ")"); + retryLetterProducer = p; } } finally { createProducerLock.writeLock().unlock(); } } - } - - private void closeRetryLetterProducerAfterException() { - createProducerLock.writeLock().lock(); - try { - CompletableFuture> previousRetryLetterProducer = retryLetterProducer; - retryLetterProducer = null; - retryLetterProducerFailureCount++; - closeProducerFuture(previousRetryLetterProducer); - } finally { - createProducerLock.writeLock().unlock(); - } + return p; } @Override From 16c82e441dadc15eb472aadd7e6d4e951e37ece4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 9 Jan 2025 14:23:58 +0200 Subject: [PATCH 7/8] Ignore VO_VOLATILE_INCREMENT in ConsumerImpl --- pulsar-client/src/main/resources/findbugsExclude.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml index e41dd4eca58d6..f7cf6b9cfd50e 100644 --- a/pulsar-client/src/main/resources/findbugsExclude.xml +++ b/pulsar-client/src/main/resources/findbugsExclude.xml @@ -1045,12 +1045,6 @@ - - - - - - From 1e509d022c69b500a49e60ef045afdba90194b69 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 9 Jan 2025 14:54:28 +0200 Subject: [PATCH 8/8] Fix invalid tests in RetryTopicTest and fix MultiTopicsConsumerImpl.close --- .../pulsar/client/api/RetryTopicTest.java | 67 ++++++------------- .../pulsar/client/impl/ConsumerImpl.java | 41 ++++++------ .../client/impl/MultiTopicsConsumerImpl.java | 9 ++- 3 files changed, 51 insertions(+), 66 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 9f08819374582..91b97fa475817 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -24,7 +24,6 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -39,11 +38,8 @@ import org.apache.avro.reflect.Nullable; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; -import org.reflections.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -620,10 +616,12 @@ public void testRetryTopicByCustomTopicName() throws Exception { @Test(timeOut = 30000L) public void testRetryTopicException() throws Exception { - final String topic = "persistent://my-property/my-ns/retry-topic"; + String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic"); + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic"); final int maxRedeliveryCount = 2; final int sendMessages = 1; // subscribe before publish + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("my-subscription") @@ -632,7 +630,7 @@ public void testRetryTopicException() throws Exception { .receiverQueueSize(100) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) - .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .retryLetterTopic(retryLetterTopic) .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -645,30 +643,16 @@ public void testRetryTopicException() throws Exception { } producer.close(); - // mock a retry producer exception when reconsumelater is called - MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; - List> consumers = multiTopicsConsumer.getConsumers(); - for (ConsumerImpl c : consumers) { - Set deadLetterPolicyField = - ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); - - if (deadLetterPolicyField.size() != 0) { - Field field = deadLetterPolicyField.iterator().next(); - field.setAccessible(true); - DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); - deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#"); - } - } + admin.topics().terminateTopic(retryLetterTopic); + Message message = consumer.receive(); log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); try { consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); - } catch (PulsarClientException.InvalidTopicNameException e) { - assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class); - } catch (Exception e) { - fail("exception should be PulsarClientException.InvalidTopicNameException"); + fail("exception should be PulsarClientException.TopicTerminatedException"); + } catch (PulsarClientException.TopicTerminatedException e) { + // ok } - consumer.close(); } @@ -721,10 +705,12 @@ public void testRetryProducerWillCloseByConsumer() throws Exception { @Test(timeOut = 30000L) public void testRetryTopicExceptionWithConcurrent() throws Exception { - final String topic = "persistent://my-property/my-ns/retry-topic"; + String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic"); + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic"); final int maxRedeliveryCount = 2; final int sendMessages = 10; // subscribe before publish + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("my-subscription") @@ -733,7 +719,7 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { .receiverQueueSize(100) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) - .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .retryLetterTopic(retryLetterTopic) .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -742,24 +728,11 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { .topic(topic) .create(); for (int i = 0; i < sendMessages; i++) { - producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send(); + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); } producer.close(); - // mock a retry producer exception when reconsumelater is called - MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; - List> consumers = multiTopicsConsumer.getConsumers(); - for (ConsumerImpl c : consumers) { - Set deadLetterPolicyField = - ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); - - if (deadLetterPolicyField.size() != 0) { - Field field = deadLetterPolicyField.iterator().next(); - field.setAccessible(true); - DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); - deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#"); - } - } + admin.topics().terminateTopic(retryLetterTopic); List> messages = Lists.newArrayList(); for (int i = 0; i < sendMessages; i++) { @@ -772,15 +745,17 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { new Thread(() -> { try { consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); - } catch (Exception ignore) { - - } finally { + } catch (PulsarClientException.TopicTerminatedException e) { + // ok latch.countDown(); + } catch (PulsarClientException e) { + // unexpected exception + fail("unexpected exception", e); } }).start(); } - latch.await(); + latch.await(sendMessages, TimeUnit.SECONDS); consumer.close(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 00b9b31d2a063..77a91a944ee6c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1109,10 +1109,29 @@ public void connectionFailed(PulsarClientException exception) { public synchronized CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); + ArrayList> closeFutures = new ArrayList<>(4); + closeFutures.add(closeFuture); + if (retryLetterProducer != null) { + closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { + if (ex != null) { + log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); + } + })); + } + if (deadLetterProducer != null) { + closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { + if (ex != null) { + log.warn("Exception ignored in closing deadLetterProducer of consumer", ex); + } + })); + } + CompletableFuture compositeCloseFuture = FutureUtil.waitForAll(closeFutures); + + if (getState() == State.Closing || getState() == State.Closed) { closeConsumerTasks(); failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null)); - return closeFuture; + return compositeCloseFuture; } consumersClosedCounter.increment(); @@ -1124,7 +1143,7 @@ public synchronized CompletableFuture closeAsync() { deregisterFromClientCnx(); client.cleanupConsumer(this); failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null)); - return closeFuture; + return compositeCloseFuture; } stats.getStatTimeout().ifPresent(Timeout::cancel); @@ -1151,23 +1170,7 @@ public synchronized CompletableFuture closeAsync() { }); } - ArrayList> closeFutures = new ArrayList<>(4); - closeFutures.add(closeFuture); - if (retryLetterProducer != null) { - closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { - if (ex != null) { - log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); - } - })); - } - if (deadLetterProducer != null) { - closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { - if (ex != null) { - log.warn("Exception ignored in closing deadLetterProducer of consumer", ex); - } - })); - } - return FutureUtil.waitForAll(closeFutures); + return compositeCloseFuture; } private void cleanupAtClose(CompletableFuture closeFuture, Throwable exception) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 6f9c5b47c55bb..341272cd69bf8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -638,7 +638,14 @@ public CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); List> futureList = consumers.values().stream() - .map(ConsumerImpl::closeAsync).collect(Collectors.toList()); + .map(consumer -> consumer.closeAsync().exceptionally(t -> { + Throwable cause = FutureUtil.unwrapCompletionException(t); + if (!(cause instanceof PulsarClientException.AlreadyClosedException)) { + log.warn("[{}] [{}] Error closing individual consumer", consumer.getTopic(), + consumer.getSubscription(), cause); + } + return null; + })).collect(Collectors.toList()); FutureUtil.waitForAll(futureList) .thenComposeAsync((r) -> {