diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java new file mode 100644 index 0000000000000..d6a86fc7959c2 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientInterruptTest.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.intercept.MockBrokerInterceptor; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-impl") +public class ClientInterruptTest extends ProducerConsumerBase { + + private final CreationInterceptor interceptor = new CreationInterceptor(); + private int index = 0; + private PulsarClientImpl client; + private String topic; + private ExecutorService executor; + private CompletableFuture delayTriggered; + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + client = (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build(); + pulsar.getBrokerService().setInterceptor(interceptor); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @BeforeMethod + public void setupTopic() { + interceptor.numConsumerCreated.set(0); + interceptor.numProducerCreated.set(0); + executor = Executors.newCachedThreadPool(); + TopicName topicName = TopicName.get("test-topic-" + index++); + topic = topicName.toString(); + + final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + topicName.getPersistenceNamingEncoding(); + delayTriggered = new CompletableFuture<>(); + mockZooKeeper.delay(1000L, (op, path) -> { + final var result = path.equals(mlPath); + if (result) { + log.info("Injected delay for {} {}", op, path); + delayTriggered.complete(null); + } + return result; + }); + } + + @AfterMethod(alwaysRun = true, timeOut = 10000) + public void cleanupTopic() { + executor.shutdown(); + } + + @Test(timeOut = 10000) + public void testCreateProducer() throws Exception { + testCreateInterrupt("producer", () -> client.newProducer().topic(topic).create()); + } + + @Test(timeOut = 10000) + public void testSubscribe() throws Exception { + testCreateInterrupt("consumer", () -> client.newConsumer().topic(topic).subscriptionName("sub") + .subscribe()); + } + + @Test(timeOut = 10000) + public void testCreateReader() throws Exception { + testCreateInterrupt("reader", () -> client.newReader().topic(topic).startMessageId(MessageId.earliest) + .create()); + } + + private void testCreateInterrupt(String name, PulsarClientSyncTask task) throws Exception { + final var exception = new AtomicReference(); + final var threadInterrupted = new CompletableFuture(); + final var future = executor.submit(() -> { + try { + task.run(); + exception.set(new PulsarClientException("Task " + name + " succeeded")); + } catch (PulsarClientException e) { + exception.set(e); + } + + try { + Thread.sleep(1); + threadInterrupted.complete(false); + } catch (InterruptedException __) { + threadInterrupted.complete(true); + } + }); + delayTriggered.get(); + future.cancel(true); + + Awaitility.await().untilAsserted(() -> assertNotNull(exception.get())); + assertTrue(exception.get().getCause() instanceof InterruptedException); + + Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getTopics().containsKey(topic))); + final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get() + .orElseThrow(); + + if (name.equals("producer")) { + Awaitility.await().untilAsserted(() -> assertEquals(interceptor.numProducerCreated.get(), 1)); + // Verify the created producer will eventually be closed + Awaitility.await().untilAsserted(() -> { + assertEquals(persistentTopic.getProducers().size(), 0); + assertEquals(client.producersCount(), 0); + }); + } else { + Awaitility.await().untilAsserted(() -> assertEquals(interceptor.numConsumerCreated.get(), 1)); + // Verify the created consumer will eventually be closed + Awaitility.await().untilAsserted(() -> { + persistentTopic.getSubscriptions().values().forEach(subscription -> + assertTrue(subscription.getConsumers().isEmpty())); + assertEquals(client.consumersCount(), 0); + }); + } + // The thread's interrupt state should not be set, it's the caller's responsibility to set the interrupt state + // if necessary when catching the `PulsarClientException` that wraps an `InterruptedException` + assertFalse(threadInterrupted.get()); + } + + private interface PulsarClientSyncTask { + + void run() throws PulsarClientException; + } + + + private static class CreationInterceptor extends MockBrokerInterceptor { + + final AtomicInteger numProducerCreated = new AtomicInteger(0); + final AtomicInteger numConsumerCreated = new AtomicInteger(0); + + @Override + public void producerCreated(ServerCnx cnx, Producer producer, Map metadata) { + numProducerCreated.incrementAndGet(); + } + + @Override + public void consumerCreated(ServerCnx cnx, Consumer consumer, Map metadata) { + numConsumerCreated.incrementAndGet(); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java index 4d6a7a6cf622f..b9355d19c2767 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerCloseTest.java @@ -20,19 +20,13 @@ import static org.testng.Assert.assertTrue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; -import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.ProducerConsumerBase; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.awaitility.Awaitility; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -54,47 +48,6 @@ protected void cleanup() throws Exception { super.internalCleanup(); } - @Test - public void testInterruptedWhenCreateConsumer() throws InterruptedException { - - String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); - String subName = "test-sub"; - String mlCursorPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" - + TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + subName; - - // Make create cursor delay 1s - CountDownLatch topicLoadLatch = new CountDownLatch(1); - for (int i = 0; i < 5; i++) { - mockZooKeeper.delay(1000, (op, path) -> { - if (mlCursorPath.equals(path)) { - topicLoadLatch.countDown(); - return true; - } - return false; - }); - } - - Thread startConsumer = new Thread(() -> { - try { - pulsarClient.newConsumer() - .topic(tpName) - .subscriptionName(subName) - .subscribe(); - Assert.fail("Should have thrown an exception"); - } catch (PulsarClientException e) { - assertTrue(e.getCause() instanceof InterruptedException); - } - }); - startConsumer.start(); - topicLoadLatch.await(); - startConsumer.interrupt(); - - PulsarClientImpl clientImpl = (PulsarClientImpl) pulsarClient; - Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { - Assert.assertEquals(clientImpl.consumersCount(), 0); - }); - } - @Test public void testReceiveWillDoneAfterClosedConsumer() throws Exception { String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index ce602a0ec9f36..ab64119cb8f29 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -71,7 +71,6 @@ public class ConsumerBuilderImpl implements ConsumerBuilder { private ConsumerConfigurationData conf; private final Schema schema; private List> interceptorList; - private volatile boolean interruptedBeforeConsumerCreation; private static final long MIN_ACK_TIMEOUT_MILLIS = 1000; private static final long MIN_TICK_TIME_MILLIS = 100; @@ -101,31 +100,8 @@ public ConsumerBuilder clone() { @Override public Consumer subscribe() throws PulsarClientException { - CompletableFuture> future = new CompletableFuture<>(); try { - subscribeAsync().whenComplete((c, e) -> { - if (e != null) { - // If the subscription fails, there is no need to close the consumer here, - // as it will be handled in the subscribeAsync method. - future.completeExceptionally(e); - return; - } - if (interruptedBeforeConsumerCreation) { - c.closeAsync().exceptionally(closeEx -> { - log.error("Failed to close consumer after interruption", closeEx.getCause()); - return null; - }); - future.completeExceptionally(new PulsarClientException( - "Subscription was interrupted before the consumer could be fully created")); - } else { - future.complete(c); - } - }); - return future.get(); - } catch (InterruptedException e) { - interruptedBeforeConsumerCreation = true; - Thread.currentThread().interrupt(); - throw PulsarClientException.unwrap(e); + return FutureUtil.getAndCleanupOnInterrupt(subscribeAsync(), Consumer::closeAsync); } catch (Exception e) { throw PulsarClientException.unwrap(e); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java index 0aba7db2925db..7c33cba964527 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java @@ -84,7 +84,7 @@ public ProducerBuilder clone() { @Override public Producer create() throws PulsarClientException { try { - return createAsync().get(); + return FutureUtil.getAndCleanupOnInterrupt(createAsync(), Producer::closeAsync); } catch (Exception e) { throw PulsarClientException.unwrap(e); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index d0ab90068ed31..629a076c59605 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -73,7 +73,7 @@ public ReaderBuilder clone() { @Override public Reader create() throws PulsarClientException { try { - return createAsync().get(); + return FutureUtil.getAndCleanupOnInterrupt(createAsync(), Reader::closeAsync); } catch (Exception e) { throw PulsarClientException.unwrap(e); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index b9ee70dc825d2..ae784a1b18db3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -35,6 +35,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -381,4 +382,32 @@ public static void safeRunAsync(Runnable runnable, return null; }); } + + /** + * Blocks to get the result of a CompletableFuture, while ensuring resources are cleaned up + * if the wait is interrupted. + *

+ * If the current thread is interrupted while waiting, this method registers a cleanup action + * to be executed when the future eventually completes. This prevents resource leaks that + * could otherwise occur when an interruption happens but the underlying asynchronous task + * finishes successfully later. After registering the action, it re-throws the + * {@link InterruptedException}. + * + * @param future The CompletableFuture to wait for. + * @param cleanupAction A consumer that performs a cleanup action (e.g., closing a resource) + * on the result if the wait is interrupted. + * @param The type of the future's result. + * @return The computed result from the future. + * @throws InterruptedException if the current thread was interrupted while waiting. + * @throws ExecutionException if the future completed exceptionally. + */ + public static T getAndCleanupOnInterrupt(CompletableFuture future, Consumer cleanupAction) + throws InterruptedException, ExecutionException { + try { + return future.get(); + } catch (InterruptedException e) { + future.thenAccept(cleanupAction); + throw e; + } + } }