Skip to content

Commit

Permalink
PIP-74 add support consumer client memory limit (#15216)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored May 7, 2022
1 parent 306a0a1 commit 1a098d5
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.client.impl;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-impl")
@Slf4j
public class ConsumerMemoryLimitTest extends ProducerConsumerBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testConsumerMemoryLimit() throws Exception {
String topic = newTopicName();

ClientBuilder clientBuilder = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.memoryLimit(10, SizeUnit.KILO_BYTES);

@Cleanup
PulsarTestClient client = PulsarTestClient.create(clientBuilder);

@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer().topic(topic).enableBatching(false)
.blockIfQueueFull(false)
.create();

@Cleanup
ConsumerImpl<byte[]> c1 = (ConsumerImpl<byte[]>) client.newConsumer().subscriptionName("sub").topic(topic)
.autoScaledReceiverQueueSizeEnabled(true).subscribe();
@Cleanup
ConsumerImpl<byte[]> c2 = (ConsumerImpl<byte[]>) client.newConsumer().subscriptionName("sub2").topic(topic)
.autoScaledReceiverQueueSizeEnabled(true).subscribe();
c2.updateAutoScaleReceiverQueueHint();
int n = 5;
for (int i = 0; i < n; i++) {
producer.send(new byte[3000]);
}
Awaitility.await().until(c1.scaleReceiverQueueHint::get);


c1.setCurrentReceiverQueueSize(10);
Awaitility.await().until(() -> c1.incomingMessages.size() == n);
log.info("memory usage:{}", client.getMemoryLimitController().currentUsagePercent());

//1. check memory limit reached,
Assert.assertTrue(client.getMemoryLimitController().currentUsagePercent() > 1);

//2. check c2 can't expand receiver queue.
Assert.assertEquals(c2.getCurrentReceiverQueueSize(), 1);
for (int i = 0; i < n; i++) {
Awaitility.await().until(() -> c2.incomingMessages.size() == 1);
Assert.assertNotNull(c2.receive());
}
Assert.assertTrue(c2.scaleReceiverQueueHint.get());
c2.receiveAsync(); //this should trigger c2 receiver queue size expansion.
Awaitility.await().until(() -> !c2.pendingReceives.isEmpty()); //make sure expectMoreIncomingMessages is called.
Assert.assertEquals(c2.getCurrentReceiverQueueSize(), 1);

//3. producer can't send message;
Assert.expectThrows(PulsarClientException.MemoryBufferIsFullError.class, () -> producer.send(new byte[10]));

//4. ConsumerBase#reduceCurrentReceiverQueueSize is called already. Queue size reduced to 5.
log.info("RQS:{}", c1.getCurrentReceiverQueueSize());
Assert.assertEquals(c1.getCurrentReceiverQueueSize(), 5);

for (int i = 0; i < n; i++) {
c1.receive();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -69,6 +70,7 @@

public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1;
protected static final double MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION = 0.75;

protected final String subscription;
protected final ConsumerConfigurationData<T> conf;
Expand Down Expand Up @@ -166,20 +168,40 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
initReceiverQueueSize();
}

public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, minReceiverQueueSize());
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
}
}

public abstract void initReceiverQueueSize();
public abstract int minReceiverQueueSize();

protected void expectMoreIncomingMessages() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
}
if (scaleReceiverQueueHint.compareAndSet(true, false)) {
double usage = getMemoryLimitController().map(MemoryLimitController::currentUsagePercent).orElse(0d);
if (usage < MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION
&& scaleReceiverQueueHint.compareAndSet(true, false)) {
int oldSize = getCurrentReceiverQueueSize();
int newSize = Math.min(maxReceiverQueueSize, oldSize * 2);
setCurrentReceiverQueueSize(newSize);
}
}

protected void reduceCurrentReceiverQueueSize() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
}
int oldSize = getCurrentReceiverQueueSize();
int newSize = Math.max(minReceiverQueueSize(), oldSize / 2);
if (oldSize > newSize) {
setCurrentReceiverQueueSize(newSize);
}
}

@Override
public Message<T> receive() throws PulsarClientException {
if (listener != null) {
Expand Down Expand Up @@ -795,6 +817,7 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
// anymore, since for pooled messages, this instance was possibly already been released and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
return hasEnoughMessagesForBatchReceive();
Expand Down Expand Up @@ -1055,12 +1078,23 @@ protected boolean hasPendingBatchReceive() {
return pendingBatchReceives != null && hasNextBatchReceive();
}

Optional<MemoryLimitController> getMemoryLimitController() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
//disable memory limit.
return Optional.empty();
} else {
return Optional.of(client.getMemoryLimitController());
}
}

protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
long oldSize = INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(oldSize));
}

protected void decreaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size()));
}

public long getIncomingMessageSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,18 +417,13 @@ public CompletableFuture<Void> unsubscribeAsync() {
}

@Override
public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
// turn on autoScaledReceiverQueueSize
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
// consumerImpl may store (half-1) permits locally.
size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
}
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
public int minReceiverQueueSize() {
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
// consumerImpl may store (half-1) permits locally.
size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
}
return size;
}

@Override
Expand Down Expand Up @@ -1940,7 +1935,11 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {

@Override
protected void updateAutoScaleReceiverQueueHint() {
scaleReceiverQueueHint.set(getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
boolean prev = scaleReceiverQueueHint.getAndSet(
getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
if (log.isDebugEnabled() && prev != scaleReceiverQueueHint.get()) {
log.debug("updateAutoScaleReceiverQueueHint {} -> {}", prev, scaleReceiverQueueHint.get());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,27 @@
public class MemoryLimitController {

private final long memoryLimit;
private final long triggerThreshold;
private final Runnable trigger;
private final AtomicLong currentUsage = new AtomicLong();
private final ReentrantLock mutex = new ReentrantLock(false);
private final Condition condition = mutex.newCondition();

public MemoryLimitController(long memoryLimitBytes) {
this.memoryLimit = memoryLimitBytes;
triggerThreshold = 0;
trigger = null;
}

public MemoryLimitController(long memoryLimitBytes, long triggerThreshold, Runnable trigger) {
this.memoryLimit = memoryLimitBytes;
this.triggerThreshold = triggerThreshold;
this.trigger = trigger;
}

public void forceReserveMemory(long size) {
long newUsage = currentUsage.addAndGet(size);
checkTrigger(newUsage - size, newUsage);
}

public boolean tryReserveMemory(long size) {
Expand All @@ -45,11 +60,18 @@ public boolean tryReserveMemory(long size) {
}

if (currentUsage.compareAndSet(current, newUsage)) {
checkTrigger(current, newUsage);
return true;
}
}
}

private void checkTrigger(long prevUsage, long newUsage) {
if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
trigger.run();
}
}

public void reserveMemory(long size) throws InterruptedException {
if (!tryReserveMemory(size)) {
mutex.lock();
Expand Down Expand Up @@ -80,4 +102,8 @@ public void releaseMemory(long size) {
public long currentUsage() {
return currentUsage.get();
}

public double currentUsagePercent() {
return 1.0 * currentUsage.get() / memoryLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,16 +338,15 @@ private boolean isValidConsumerEpoch(Message<T> message) {
}

@Override
public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
}
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
public int minReceiverQueueSize() {
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
}
if (allTopicPartitionsNumber != null) {
size = Math.max(allTopicPartitionsNumber.get(), size);
}
return size;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class PulsarClientImpl implements PulsarClient {

private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
private static final int CLOSE_TIMEOUT_SECONDS = 60;
private static final double THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;

protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
Expand Down Expand Up @@ -211,7 +212,9 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
}
}

memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes());
memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes(),
(long) (conf.getMemoryLimitBytes() * THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
this::reduceConsumerReceiverQueueSize);
state.set(State.Open);
} catch (Throwable t) {
shutdown();
Expand All @@ -221,6 +224,12 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
}
}

private void reduceConsumerReceiverQueueSize() {
for (ConsumerBase<?> consumer : consumers) {
consumer.reduceCurrentReceiverQueueSize();
}
}

private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
if (StringUtils.isBlank(conf.getAuthPluginClassName())
|| (StringUtils.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,18 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf
createTopicIfDoesNotExist);
}

@Override
public int minReceiverQueueSize() {
return 0;
}

@Override
public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
throw new NotImplementedException("AutoScaledReceiverQueueSize is not supported in ZeroQueueConsumerImpl");
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, 0);
}

}

@Override
Expand Down
Loading

0 comments on commit 1a098d5

Please sign in to comment.