Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker-impl")
public class ClientInterruptTest extends ProducerConsumerBase {

private final CreationInterceptor interceptor = new CreationInterceptor();
private int index = 0;
private PulsarClientImpl client;
private String topic;
private ExecutorService executor;
private CompletableFuture<Void> delayTriggered;

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
client = (PulsarClientImpl) PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
pulsar.getBrokerService().setInterceptor(interceptor);
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@BeforeMethod
public void setupTopic() {
interceptor.numConsumerCreated.set(0);
interceptor.numProducerCreated.set(0);
executor = Executors.newCachedThreadPool();
TopicName topicName = TopicName.get("test-topic-" + index++);
topic = topicName.toString();

final var mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" + topicName.getPersistenceNamingEncoding();
delayTriggered = new CompletableFuture<>();
mockZooKeeper.delay(1000L, (op, path) -> {
final var result = path.equals(mlPath);
if (result) {
log.info("Injected delay for {} {}", op, path);
delayTriggered.complete(null);
}
return result;
});
}

@AfterMethod(alwaysRun = true, timeOut = 10000)
public void cleanupTopic() {
executor.shutdown();
}

@Test(timeOut = 10000)
public void testCreateProducer() throws Exception {
testCreateInterrupt("producer", () -> client.newProducer().topic(topic).create());
}

@Test(timeOut = 10000)
public void testSubscribe() throws Exception {
testCreateInterrupt("consumer", () -> client.newConsumer().topic(topic).subscriptionName("sub")
.subscribe());
}

@Test(timeOut = 10000)
public void testCreateReader() throws Exception {
testCreateInterrupt("reader", () -> client.newReader().topic(topic).startMessageId(MessageId.earliest)
.create());
}

private void testCreateInterrupt(String name, PulsarClientSyncTask task) throws Exception {
final var exception = new AtomicReference<PulsarClientException>();
final var threadInterrupted = new CompletableFuture<Boolean>();
final var future = executor.submit(() -> {
try {
task.run();
exception.set(new PulsarClientException("Task " + name + " succeeded"));
} catch (PulsarClientException e) {
exception.set(e);
}

try {
Thread.sleep(1);
threadInterrupted.complete(false);
} catch (InterruptedException __) {
threadInterrupted.complete(true);
}
});
delayTriggered.get();
future.cancel(true);

Awaitility.await().untilAsserted(() -> assertNotNull(exception.get()));
assertTrue(exception.get().getCause() instanceof InterruptedException);

Awaitility.await().untilAsserted(() -> assertTrue(pulsar.getBrokerService().getTopics().containsKey(topic)));
final var persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get()
.orElseThrow();

if (name.equals("producer")) {
Awaitility.await().untilAsserted(() -> assertEquals(interceptor.numProducerCreated.get(), 1));
// Verify the created producer will eventually be closed
Awaitility.await().untilAsserted(() -> {
assertEquals(persistentTopic.getProducers().size(), 0);
assertEquals(client.producersCount(), 0);
});
} else {
Awaitility.await().untilAsserted(() -> assertEquals(interceptor.numConsumerCreated.get(), 1));
// Verify the created consumer will eventually be closed
Awaitility.await().untilAsserted(() -> {
persistentTopic.getSubscriptions().values().forEach(subscription ->
assertTrue(subscription.getConsumers().isEmpty()));
assertEquals(client.consumersCount(), 0);
});
}
// The thread's interrupt state should not be set, it's the caller's responsibility to set the interrupt state
// if necessary when catching the `PulsarClientException` that wraps an `InterruptedException`
assertFalse(threadInterrupted.get());
}

private interface PulsarClientSyncTask {

void run() throws PulsarClientException;
}


private static class CreationInterceptor extends MockBrokerInterceptor {

final AtomicInteger numProducerCreated = new AtomicInteger(0);
final AtomicInteger numConsumerCreated = new AtomicInteger(0);

@Override
public void producerCreated(ServerCnx cnx, Producer producer, Map<String, String> metadata) {
numProducerCreated.incrementAndGet();
}

@Override
public void consumerCreated(ServerCnx cnx, Consumer consumer, Map<String, String> metadata) {
numConsumerCreated.incrementAndGet();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,13 @@

import static org.testng.Assert.assertTrue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand All @@ -54,47 +48,6 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testInterruptedWhenCreateConsumer() throws InterruptedException {

String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
String subName = "test-sub";
String mlCursorPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/"
+ TopicName.get(tpName).getPersistenceNamingEncoding() + "/" + subName;

// Make create cursor delay 1s
CountDownLatch topicLoadLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
mockZooKeeper.delay(1000, (op, path) -> {
if (mlCursorPath.equals(path)) {
topicLoadLatch.countDown();
return true;
}
return false;
});
}

Thread startConsumer = new Thread(() -> {
try {
pulsarClient.newConsumer()
.topic(tpName)
.subscriptionName(subName)
.subscribe();
Assert.fail("Should have thrown an exception");
} catch (PulsarClientException e) {
assertTrue(e.getCause() instanceof InterruptedException);
}
});
startConsumer.start();
topicLoadLatch.await();
startConsumer.interrupt();

PulsarClientImpl clientImpl = (PulsarClientImpl) pulsarClient;
Awaitility.await().ignoreExceptions().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
Assert.assertEquals(clientImpl.consumersCount(), 0);
});
}

@Test
public void testReceiveWillDoneAfterClosedConsumer() throws Exception {
String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
private ConsumerConfigurationData<T> conf;
private final Schema<T> schema;
private List<ConsumerInterceptor<T>> interceptorList;
private volatile boolean interruptedBeforeConsumerCreation;

private static final long MIN_ACK_TIMEOUT_MILLIS = 1000;
private static final long MIN_TICK_TIME_MILLIS = 100;
Expand Down Expand Up @@ -101,31 +100,8 @@ public ConsumerBuilder<T> clone() {

@Override
public Consumer<T> subscribe() throws PulsarClientException {
CompletableFuture<Consumer<T>> future = new CompletableFuture<>();
try {
subscribeAsync().whenComplete((c, e) -> {
if (e != null) {
// If the subscription fails, there is no need to close the consumer here,
// as it will be handled in the subscribeAsync method.
future.completeExceptionally(e);
return;
}
if (interruptedBeforeConsumerCreation) {
c.closeAsync().exceptionally(closeEx -> {
log.error("Failed to close consumer after interruption", closeEx.getCause());
return null;
});
future.completeExceptionally(new PulsarClientException(
"Subscription was interrupted before the consumer could be fully created"));
} else {
future.complete(c);
}
});
return future.get();
} catch (InterruptedException e) {
interruptedBeforeConsumerCreation = true;
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
return FutureUtil.getAndCleanupOnInterrupt(subscribeAsync(), Consumer::closeAsync);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public ProducerBuilder<T> clone() {
@Override
public Producer<T> create() throws PulsarClientException {
try {
return createAsync().get();
return FutureUtil.getAndCleanupOnInterrupt(createAsync(), Producer::closeAsync);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public ReaderBuilder<T> clone() {
@Override
public Reader<T> create() throws PulsarClientException {
try {
return createAsync().get();
return FutureUtil.getAndCleanupOnInterrupt(createAsync(), Reader::closeAsync);
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -381,4 +382,32 @@ public static void safeRunAsync(Runnable runnable,
return null;
});
}

/**
* Blocks to get the result of a CompletableFuture, while ensuring resources are cleaned up
* if the wait is interrupted.
* <p>
* If the current thread is interrupted while waiting, this method registers a cleanup action
* to be executed when the future eventually completes. This prevents resource leaks that
* could otherwise occur when an interruption happens but the underlying asynchronous task
* finishes successfully later. After registering the action, it re-throws the
* {@link InterruptedException}.
*
* @param future The CompletableFuture to wait for.
* @param cleanupAction A consumer that performs a cleanup action (e.g., closing a resource)
* on the result if the wait is interrupted.
* @param <T> The type of the future's result.
* @return The computed result from the future.
* @throws InterruptedException if the current thread was interrupted while waiting.
* @throws ExecutionException if the future completed exceptionally.
*/
public static <T> T getAndCleanupOnInterrupt(CompletableFuture<T> future, Consumer<T> cleanupAction)
throws InterruptedException, ExecutionException {
try {
return future.get();
} catch (InterruptedException e) {
future.thenAccept(cleanupAction);
throw e;
}
}
}
Loading