Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PIP-74] Support consumer client memory limit #15216

Merged
merged 1 commit into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public void testConsumerImpl() throws PulsarClientException {
byte[] data = "data".getBytes(StandardCharsets.UTF_8);

producer.send(data);
Assert.assertNotNull(consumer.receive());
Awaitility.await().until(consumer.scaleReceiverQueueHint::get);
Assert.assertNotNull(consumer.receive());
log.info("getCurrentReceiverQueueSize={}", consumer.getCurrentReceiverQueueSize());

//this will trigger receiver queue size expanding.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.client.impl;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker-impl")
@Slf4j
public class ConsumerMemoryLimitTest extends ProducerConsumerBase {

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testConsumerMemoryLimit() throws Exception {
String topic = newTopicName();

ClientBuilder clientBuilder = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.memoryLimit(10, SizeUnit.KILO_BYTES);

@Cleanup
PulsarTestClient client = PulsarTestClient.create(clientBuilder);

@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) client.newProducer().topic(topic).enableBatching(false)
.blockIfQueueFull(false)
.create();

@Cleanup
ConsumerImpl<byte[]> c1 = (ConsumerImpl<byte[]>) client.newConsumer().subscriptionName("sub").topic(topic)
.autoScaledReceiverQueueSizeEnabled(true).subscribe();
@Cleanup
ConsumerImpl<byte[]> c2 = (ConsumerImpl<byte[]>) client.newConsumer().subscriptionName("sub2").topic(topic)
.autoScaledReceiverQueueSizeEnabled(true).subscribe();
c2.updateAutoScaleReceiverQueueHint();
int n = 5;
for (int i = 0; i < n; i++) {
producer.send(new byte[3000]);
}
Awaitility.await().until(c1.scaleReceiverQueueHint::get);


c1.setCurrentReceiverQueueSize(10);
Awaitility.await().until(() -> c1.incomingMessages.size() == n);
log.info("memory usage:{}", client.getMemoryLimitController().currentUsagePercent());

//1. check memory limit reached,
Assert.assertTrue(client.getMemoryLimitController().currentUsagePercent() > 1);

//2. check c2 can't expand receiver queue.
Assert.assertEquals(c2.getCurrentReceiverQueueSize(), 1);
for (int i = 0; i < n; i++) {
Awaitility.await().until(() -> c2.incomingMessages.size() == 1);
Assert.assertNotNull(c2.receive());
}
Assert.assertTrue(c2.scaleReceiverQueueHint.get());
c2.receiveAsync(); //this should trigger c2 receiver queue size expansion.
Awaitility.await().until(() -> !c2.pendingReceives.isEmpty()); //make sure expectMoreIncomingMessages is called.
Assert.assertEquals(c2.getCurrentReceiverQueueSize(), 1);

//3. producer can't send message;
Assert.expectThrows(PulsarClientException.MemoryBufferIsFullError.class, () -> producer.send(new byte[10]));

//4. ConsumerBase#reduceCurrentReceiverQueueSize is called already. Queue size reduced to 5.
log.info("RQS:{}", c1.getCurrentReceiverQueueSize());
Assert.assertEquals(c1.getCurrentReceiverQueueSize(), 5);

for (int i = 0; i < n; i++) {
c1.receive();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -69,6 +70,7 @@

public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1;
protected static final double MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION = 0.75;

protected final String subscription;
protected final ConsumerConfigurationData<T> conf;
Expand Down Expand Up @@ -167,20 +169,40 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
initReceiverQueueSize();
}

public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, minReceiverQueueSize());
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
}
}

public abstract void initReceiverQueueSize();
public abstract int minReceiverQueueSize();

protected void expectMoreIncomingMessages() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
}
if (scaleReceiverQueueHint.compareAndSet(true, false)) {
double usage = getMemoryLimitController().map(MemoryLimitController::currentUsagePercent).orElse(0d);
if (usage < MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION
&& scaleReceiverQueueHint.compareAndSet(true, false)) {
int oldSize = getCurrentReceiverQueueSize();
int newSize = Math.min(maxReceiverQueueSize, oldSize * 2);
setCurrentReceiverQueueSize(newSize);
}
}

protected void reduceCurrentReceiverQueueSize() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
}
int oldSize = getCurrentReceiverQueueSize();
int newSize = Math.max(minReceiverQueueSize(), oldSize / 2);
if (oldSize > newSize) {
setCurrentReceiverQueueSize(newSize);
}
}

@Override
public Message<T> receive() throws PulsarClientException {
if (listener != null) {
Expand Down Expand Up @@ -796,6 +818,7 @@ protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message instance
// anymore, since for pooled messages, this instance was possibly already been released and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
return hasEnoughMessagesForBatchReceive();
Expand Down Expand Up @@ -1052,12 +1075,23 @@ protected boolean hasPendingBatchReceive() {
return pendingBatchReceives != null && hasNextBatchReceive();
}

Optional<MemoryLimitController> getMemoryLimitController() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
//disable memory limit.
return Optional.empty();
} else {
return Optional.of(client.getMemoryLimitController());
}
}

protected void resetIncomingMessageSize() {
INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
long oldSize = INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(oldSize));
}

protected void decreaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size()));
}

public long getIncomingMessageSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,18 +413,13 @@ public CompletableFuture<Void> unsubscribeAsync() {
}

@Override
public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
// turn on autoScaledReceiverQueueSize
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
// consumerImpl may store (half-1) permits locally.
size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
}
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
public int minReceiverQueueSize() {
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
// consumerImpl may store (half-1) permits locally.
size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
}
return size;
}

@Override
Expand Down Expand Up @@ -1927,7 +1922,11 @@ public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {

@Override
protected void updateAutoScaleReceiverQueueHint() {
scaleReceiverQueueHint.set(getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
boolean prev = scaleReceiverQueueHint.getAndSet(
getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
if (log.isDebugEnabled() && prev != scaleReceiverQueueHint.get()) {
log.debug("updateAutoScaleReceiverQueueHint {} -> {}", prev, scaleReceiverQueueHint.get());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,27 @@
public class MemoryLimitController {

private final long memoryLimit;
private final long triggerThreshold;
private final Runnable trigger;
private final AtomicLong currentUsage = new AtomicLong();
private final ReentrantLock mutex = new ReentrantLock(false);
private final Condition condition = mutex.newCondition();

public MemoryLimitController(long memoryLimitBytes) {
this.memoryLimit = memoryLimitBytes;
triggerThreshold = 0;
trigger = null;
}

public MemoryLimitController(long memoryLimitBytes, long triggerThreshold, Runnable trigger) {
this.memoryLimit = memoryLimitBytes;
this.triggerThreshold = triggerThreshold;
this.trigger = trigger;
}

public void forceReserveMemory(long size) {
long newUsage = currentUsage.addAndGet(size);
checkTrigger(newUsage - size, newUsage);
}

public boolean tryReserveMemory(long size) {
Expand All @@ -45,11 +60,18 @@ public boolean tryReserveMemory(long size) {
}

if (currentUsage.compareAndSet(current, newUsage)) {
checkTrigger(current, newUsage);
return true;
}
}
}

private void checkTrigger(long prevUsage, long newUsage) {
if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && trigger != null) {
Jason918 marked this conversation as resolved.
Show resolved Hide resolved
trigger.run();
}
}

public void reserveMemory(long size) throws InterruptedException {
if (!tryReserveMemory(size)) {
mutex.lock();
Expand Down Expand Up @@ -80,4 +102,8 @@ public void releaseMemory(long size) {
public long currentUsage() {
return currentUsage.get();
}

public double currentUsagePercent() {
return 1.0 * currentUsage.get() / memoryLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,16 +338,15 @@ private boolean isValidConsumerEpoch(Message<T> message) {
}

@Override
public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
}
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, size);
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
public int minReceiverQueueSize() {
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
size = Math.max(size, batchReceivePolicy.getMaxNumMessages());
}
if (allTopicPartitionsNumber != null) {
size = Math.max(allTopicPartitionsNumber.get(), size);
}
return size;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class PulsarClientImpl implements PulsarClient {

private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
private static final int CLOSE_TIMEOUT_SECONDS = 60;
private static final double THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING = 0.95;

protected final ClientConfigurationData conf;
private final boolean createdExecutorProviders;
Expand Down Expand Up @@ -211,7 +212,9 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
}
}

memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes());
memoryLimitController = new MemoryLimitController(conf.getMemoryLimitBytes(),
(long) (conf.getMemoryLimitBytes() * THRESHOLD_FOR_CONSUMER_RECEIVER_QUEUE_SIZE_SHRINKING),
this::reduceConsumerReceiverQueueSize);
state.set(State.Open);
} catch (Throwable t) {
shutdown();
Expand All @@ -221,6 +224,12 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG
}
}

private void reduceConsumerReceiverQueueSize() {
for (ConsumerBase<?> consumer : consumers) {
consumer.reduceCurrentReceiverQueueSize();
}
}

private void setAuth(ClientConfigurationData conf) throws PulsarClientException {
if (StringUtils.isBlank(conf.getAuthPluginClassName())
|| (StringUtils.isBlank(conf.getAuthParams()) && conf.getAuthParamMap() == null)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,18 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf
createTopicIfDoesNotExist);
}

@Override
public int minReceiverQueueSize() {
return 0;
}

@Override
public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
throw new NotImplementedException("AutoScaledReceiverQueueSize is not supported in ZeroQueueConsumerImpl");
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, 0);
}

}

@Override
Expand Down
Loading