Skip to content

Commit

Permalink
[improve][client]PIP-359:Support custom message listener executor for…
Browse files Browse the repository at this point in the history
… specific subscription (apache#22861)

Co-authored-by: duanlinlin <[email protected]>
[PIP-359](apache#22902)
Support custom message listener thread pool for specific subscription, avoid individual subscription listener consuming too much time leading to higher consumption delay in other subscriptions.

<!--
### Contribution Checklist

  - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://pulsar.apache.org/contribute/develop-semantic-title/)*.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.

  - Each pull request should address only one issue, not mix up code from multiple issues.

  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
-->

<!-- Either this PR fixes an issue, -->

<!-- or this PR is one task of an issue -->

<!-- If the PR belongs to a PIP, please add the PIP link here -->

<!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/pip/README.md -->

### Motivation
In our scenario, there is a centralized message proxy service, this service will use the same PulsarClient instance to create a lot of subscription groups to consume many topics and cache messages locally.Then the business will pull messages from the cache of the proxy service. It seems that there is no problem, but during use, we found that when the
message processing time of several consumer groups (listener mode) is very high, it almost affects all consumer groups responsible for the proxy service, causing a large number of message delays.

By analyzing the source code, we found that by default, all consumer instances created from the same PulsarClient will share a thread pool to process message listeners, and sometimes there are multiple consumer message listeners bound to the same thread. Obviously, when a consumer processes messages and causes long-term blocking, it will cause the messages of other consumers bound to the thread to fail to be processed in time, resulting in message delays. Therefore, for this scenario, it may be necessary to support specific a message listener thread pool with consumer latitudes to avoid mutual influence between different consumers.

<!-- Explain here the context, and why you're making that change. What is the problem you're trying to solve. -->

### Modifications
Support custom message listener thread pool for specific subscription.
<!-- Describe the modifications you've done. -->

(cherry picked from commit 10f4e02)
  • Loading branch information
AuroraTwinkle authored and xiangying committed Sep 2, 2024
1 parent 766d2a4 commit c5846bb
Show file tree
Hide file tree
Showing 6 changed files with 280 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertTrue;
import com.google.common.util.concurrent.Uninterruptibles;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker-api")
public class MessageListenerExecutorTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(MessageListenerExecutorTest.class);

@BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Override
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
// Set listenerThreads to 1 to reproduce the pr more easily in #22861
clientBuilder.listenerThreads(1);
}

@Test
public void testConsumerMessageListenerExecutorIsolation() throws Exception {
log.info("-- Starting {} test --", methodName);

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
List<CompletableFuture<Long>> maxConsumeDelayWithDisableIsolationFutures = new ArrayList<>();
int loops = 5;
long consumeSleepTimeMs = 10000;
for (int i = 0; i < loops; i++) {
// The first consumer will consume messages with sleep block 1s,
// and the others will consume messages without sleep block.
// The maxConsumeDelayWithDisableIsolation of all consumers
// should be greater than sleepTimeMs cause by disable MessageListenerExecutor.
CompletableFuture<Long> maxConsumeDelayFuture = startConsumeAndComputeMaxConsumeDelay(
"persistent://my-property/my-ns/testConsumerMessageListenerDisableIsolation-" + i,
"my-sub-testConsumerMessageListenerDisableIsolation-" + i,
i == 0 ? Duration.ofMillis(consumeSleepTimeMs) : Duration.ofMillis(0),
false,
executor);
maxConsumeDelayWithDisableIsolationFutures.add(maxConsumeDelayFuture);
}

// ensure all consumers consume messages delay more than consumeSleepTimeMs
boolean allDelayMoreThanConsumeSleepTimeMs = maxConsumeDelayWithDisableIsolationFutures.stream()
.map(CompletableFuture::join)
.allMatch(delay -> delay > consumeSleepTimeMs);
assertTrue(allDelayMoreThanConsumeSleepTimeMs);

List<CompletableFuture<Long>> maxConsumeDelayWhitEnableIsolationFutures = new ArrayList<>();
for (int i = 0; i < loops; i++) {
// The first consumer will consume messages with sleep block 1s,
// and the others will consume messages without sleep block.
// The maxConsumeDelayWhitEnableIsolation of the first consumer
// should be greater than sleepTimeMs, and the others should be
// less than sleepTimeMs, cause by enable MessageListenerExecutor.
CompletableFuture<Long> maxConsumeDelayFuture = startConsumeAndComputeMaxConsumeDelay(
"persistent://my-property/my-ns/testConsumerMessageListenerEnableIsolation-" + i,
"my-sub-testConsumerMessageListenerEnableIsolation-" + i,
i == 0 ? Duration.ofMillis(consumeSleepTimeMs) : Duration.ofMillis(0),
true,
executor);
maxConsumeDelayWhitEnableIsolationFutures.add(maxConsumeDelayFuture);
}

assertTrue(maxConsumeDelayWhitEnableIsolationFutures.get(0).join() > consumeSleepTimeMs);
boolean remainingAlmostNoDelay = maxConsumeDelayWhitEnableIsolationFutures.stream()
.skip(1)
.map(CompletableFuture::join)
.allMatch(delay -> delay < 1000);
assertTrue(remainingAlmostNoDelay);

log.info("-- Exiting {} test --", methodName);
}

private CompletableFuture<Long> startConsumeAndComputeMaxConsumeDelay(String topic, String subscriptionName,
Duration consumeSleepTime,
boolean enableMessageListenerExecutorIsolation,
ExecutorService executorService)
throws Exception {
int numMessages = 2;
final CountDownLatch latch = new CountDownLatch(numMessages);
int numPartitions = 50;
TopicName nonIsolationTopicName = TopicName.get(topic);
admin.topics().createPartitionedTopic(nonIsolationTopicName.toString(), numPartitions);

AtomicLong maxConsumeDelay = new AtomicLong(-1);
ConsumerBuilder<Long> consumerBuilder =
pulsarClient.newConsumer(Schema.INT64)
.topic(nonIsolationTopicName.toString())
.subscriptionName(subscriptionName)
.messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
log.debug("Received message [{}] in the listener", msg.getValue());
c1.acknowledgeAsync(msg);
maxConsumeDelay.set(Math.max(maxConsumeDelay.get(),
System.currentTimeMillis() - msg.getValue()));
if (consumeSleepTime.toMillis() > 0) {
Uninterruptibles.sleepUninterruptibly(consumeSleepTime);
}
latch.countDown();
});

ExecutorService executor = Executors.newSingleThreadExecutor(
new ExecutorProvider.ExtendedThreadFactory(subscriptionName + "listener-executor-", true));
if (enableMessageListenerExecutorIsolation) {
consumerBuilder.messageListenerExecutor((message, runnable) -> executor.execute(runnable));
}

Consumer<Long> consumer = consumerBuilder.subscribe();
ProducerBuilder<Long> producerBuilder = pulsarClient.newProducer(Schema.INT64)
.topic(nonIsolationTopicName.toString());

Producer<Long> producer = producerBuilder.create();
List<Future<MessageId>> futures = new ArrayList<>();

// Asynchronously produce messages
for (int i = 0; i < numMessages; i++) {
Future<MessageId> future = producer.sendAsync(System.currentTimeMillis());
futures.add(future);
}

log.info("Waiting for async publish to complete");
for (Future<MessageId> future : futures) {
future.get();
}

CompletableFuture<Long> maxDelayFuture = new CompletableFuture<>();

CompletableFuture.runAsync(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, executorService).whenCompleteAsync((v, ex) -> {
maxDelayFuture.complete(maxConsumeDelay.get());
try {
producer.close();
consumer.close();
executor.shutdownNow();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
});

return maxDelayFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> messageListener(MessageListener<T> messageListener);

/**
* Set the {@link MessageListenerExecutor} to be used for message listeners of <b>current consumer</b>.
* <i>(default: use executor from PulsarClient,
* {@link org.apache.pulsar.client.impl.PulsarClientImpl#externalExecutorProvider})</i>.
*
* <p>The listener thread pool is exclusively owned by current consumer
* that are using a "listener" model to get messages. For a given internal consumer,
* the listener will always be invoked from the same thread, to ensure ordering.
*
* <p> The caller need to shut down the thread pool after closing the consumer to avoid leaks.
* @param messageListenerExecutor the executor of the consumer message listener
* @return the consumer builder instance
*/
ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor);

/**
* Sets a {@link CryptoKeyReader}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Interface for providing service to execute message listeners.
*/
public interface MessageListenerExecutor {

/**
* select a thread by message to execute the runnable!
* <p>
* Suggestions:
* <p>
* 1. The message listener task will be submitted to this executor for execution,
* so the implementations of this interface should carefully consider execution
* order if sequential consumption is required.
* </p>
* <p>
* 2. The users should release resources(e.g. threads) of the executor after closing
* the consumer to avoid leaks.
* </p>
* @param message the message
* @param runnable the runnable to execute, that is, the message listener task
*/
void execute(Message<?> message, Runnable runnable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageListenerExecutor;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -82,6 +83,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorProvider executorProvider;
protected final MessageListenerExecutor messageListenerExecutor;
protected final ExecutorService externalPinnedExecutor;
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
Expand Down Expand Up @@ -139,6 +141,11 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
this.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
this.messageListenerExecutor = conf.getMessageListenerExecutor() == null
? (conf.getSubscriptionType() == SubscriptionType.Key_Shared
? this::executeKeySharedMessageListener
: this::executeMessageListener)
: conf.getMessageListenerExecutor();
this.externalPinnedExecutor = executorProvider.getExecutor();
this.internalPinnedExecutor = client.getInternalExecutorService();
this.pendingReceives = Queues.newConcurrentLinkedQueue();
Expand Down Expand Up @@ -1117,14 +1124,7 @@ private void triggerListener() {
// internal pinned executor thread while the message processing happens
final Message<T> finalMsg = msg;
MESSAGE_LISTENER_QUEUE_SIZE_UPDATER.incrementAndGet(this);
if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
callMessageListener(finalMsg));
} else {
getExternalExecutor(msg).execute(() -> {
callMessageListener(finalMsg);
});
}
messageListenerExecutor.execute(msg, () -> callMessageListener(finalMsg));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
Expand All @@ -1137,6 +1137,14 @@ private void triggerListener() {
});
}

private void executeMessageListener(Message<?> message, Runnable runnable) {
getExternalExecutor(message).execute(runnable);
}

private void executeKeySharedMessageListener(Message<?> message, Runnable runnable) {
executorProvider.getExecutor(peekMessageKey(message)).execute(runnable);
}

protected void callMessageListener(Message<T> msg) {
try {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1166,7 +1174,7 @@ protected void callMessageListener(Message<T> msg) {
}

static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
protected byte[] peekMessageKey(Message<T> msg) {
protected byte[] peekMessageKey(Message<?> msg) {
byte[] key = NONE_KEY;
if (msg.hasKey()) {
key = msg.getKeyBytes();
Expand Down Expand Up @@ -1233,7 +1241,7 @@ public int getCurrentReceiverQueueSize() {

protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);

private ExecutorService getExternalExecutor(Message<T> msg) {
private ExecutorService getExternalExecutor(Message<?> msg) {
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
: null;
ExecutorService executor = receivedConsumer != null && receivedConsumer.externalPinnedExecutor != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageListenerExecutor;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
Expand Down Expand Up @@ -299,6 +300,13 @@ public ConsumerBuilder<T> messageListener(@NonNull MessageListener<T> messageLis
return this;
}

@Override
public ConsumerBuilder<T> messageListenerExecutor(MessageListenerExecutor messageListenerExecutor) {
checkArgument(messageListenerExecutor != null, "messageListenerExecutor needs to be not null");
conf.setMessageListenerExecutor(messageListenerExecutor);
return this;
}

@Override
public ConsumerBuilder<T> consumerEventListener(@NonNull ConsumerEventListener consumerEventListener) {
conf.setConsumerEventListener(consumerEventListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageListenerExecutor;
import org.apache.pulsar.client.api.MessagePayloadProcessor;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
Expand Down Expand Up @@ -90,6 +91,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {

private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;

@JsonIgnore
private transient MessageListenerExecutor messageListenerExecutor;
@JsonIgnore
private MessageListener<T> messageListener;

Expand Down

0 comments on commit c5846bb

Please sign in to comment.