Skip to content

Commit

Permalink
fix consumer fetch message number maps to read entry number bug and e…
Browse files Browse the repository at this point in the history
…xpose avgMessagesPerEntry metric (apache#6719)

### Motivation
when consumer send fetch request to broker server, it contains fetch message number telling the server how many messges should be pushed to consumer client. However, the broker server stores data in bookkeeper or broker cache according to entry not single message if producer produce message using batch feature. There is a gap to map the number of message to the number of entry when dealing with consumer fetch request.

Current strategy just using the following calculating formula to deal with those situation:
```
messagesToRead = Math.min(availablePermits, readBatchSize);
```
`availablePermits` is the number of message the consumer requested, and `readBatchSize` is the max read entry size set in broker.conf

Assuming `availablePermits` is 1000 and `readBatchSize` is 500, and each entry contains 1000 messages, messagesToRead will be 500 according to this formula. The broker server will read 500 entry, that is `500 * 1000 = 500,000` messages from bookkeeper or broker cache and push `500,000` messages to consumer at one time though the consumer just need 1000 messages, which leading the consumer cost too much memory to store the fetched message, especially when we increase `readBatchSize` to increase bookkeeper read throughput.

### Changes
I add an variable `avgMessagesPerEntry` to record average messages stored in one entry. It will update when broker server push message to the consumer using the following calculating formula
```
avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value
```
`avgMessagePerEntry` is the history average message number per entry and `new Value` is the message number per entry in the fetch request the broker read from cache or bookkeeper. `avgPercent` is a final value 0.9, and the value just control the history avgMessagePerEntry decay rate when update new one. The avgMessagePerEntry initial value is 1000.

When dealing with consumer fetch request, it will map fetch requst number to entry number according to the following formula:
```
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
```

I also expose the avgMessagePerEntry static value to consumer stat metric json.
  • Loading branch information
hangc0276 authored and Huanli-Meng committed Jun 1, 2020
1 parent 43c3736 commit a539a7a
Show file tree
Hide file tree
Showing 13 changed files with 144 additions and 1 deletion.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ dispatcherMinReadBatchSize=1
# Max number of entries to dispatch for a shared subscription. By default it is 20 entries.
dispatcherMaxRoundRobinBatchSize=20

# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ dispatchThrottlingRateRelativeToPublishRate=false
# backlog.
dispatchThrottlingOnNonBacklogConsumerEnabled=true

# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int dispatcherMaxRoundRobinBatchSize = 20;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Precise dispatcher flow control according to history message number of each entry"
)
private boolean preciseDispatcherFlowControl = false;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.netty.buffer.ByteBuf;

import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

Expand All @@ -48,6 +49,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi
.newUpdater(AbstractDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;

private Random random = new Random(42);

protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
super(subscription);
}
Expand Down Expand Up @@ -142,6 +145,21 @@ public Consumer getNextConsumer() {
return null;
}

/**
* Get random consumer from consumerList.
*
* @return null if no consumer available, else return random consumer from consumerList
*/
public Consumer getRandomConsumer() {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected of if disconnect is initiated
return null;
}

return consumerList.get(random.nextInt(consumerList.size()));
}


/**
* Finds index of first available consumer which has higher priority then given targetPriority
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ public class Consumer {

private final PulsarApi.KeySharedMeta keySharedMeta;

/**
* It starts keep tracking the average messages per entry.
* The initial value is 1000, when new value comes, it will update with
* avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value.
*/
private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry");
private volatile int avgMessagesPerEntry = 1000;

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId,
Expand All @@ -136,9 +148,12 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.msgOutCounter = new LongAdder();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl();

PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0);
AVG_MESSAGES_PER_ENTRY.set(this, 1000);

this.metadata = metadata != null ? metadata : Collections.emptyMap();

Expand Down Expand Up @@ -223,6 +238,12 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
}
}

// calculate avg message per entry
int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this);
tmpAvgMessagesPerEntry = (int) Math.round(tmpAvgMessagesPerEntry * avgPercent +
(1 - avgPercent) * totalMessages / entries.size());
AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry);

// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
incrementUnackedMessages(totalMessages);
Expand Down Expand Up @@ -432,6 +453,10 @@ public int getAvailablePermits() {
return MESSAGE_PERMITS_UPDATER.get(this);
}

public int getAvgMessagesPerEntry() {
return AVG_MESSAGES_PER_ENTRY.get(this);
}

public boolean isBlocked() {
return blockedConsumerOnUnackedMsgs;
}
Expand Down Expand Up @@ -471,6 +496,7 @@ public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
return stats;
}

Expand Down Expand Up @@ -653,5 +679,9 @@ private void clearUnAckedMsgs() {
subscription.addUnAckedMessages(-unaAckedMsgs);
}

public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public class ServerCnx extends PulsarHandler {
private final boolean schemaValidationEnforced;
private String authMethod = "none";
private final int maxMessageSize;
private boolean preciseDispatcherFlowControl;

// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
Expand Down Expand Up @@ -191,6 +192,7 @@ public ServerCnx(PulsarService pulsar) {
this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishdRequestsPerConnection();
this.resumeReadsThreshold = maxPendingSendRequests / 2;
this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl();
}

@Override
Expand Down Expand Up @@ -1952,4 +1954,8 @@ void setMessagePublishBufferSize(long bufferSize) {
void setAutoReadDisabledRateLimiting(boolean isLimiting) {
this.autoReadDisabledRateLimiting = isLimiting;
}

public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static void writeConsumerStats(StatsOutputStream statsStream, PulsarApi.C
statsStream.writePair("msgRateOut", stats.msgRateOut);
statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
statsStream.writePair("avgMessagesPerEntry", stats.avgMessagesPerEntry);

if (Subscription.isIndividualAckMode(subType)) {
statsStream.writePair("unackedMessages", stats.unackedMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
}

totalAvailablePermits += additionalNumberOfMessages;

if (log.isDebugEnabled()) {
log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {}", name, consumer,
totalAvailablePermits);
Expand All @@ -256,6 +257,12 @@ public void readMoreEntries() {
if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);

Consumer c = getRandomConsumer();
// if turn on precise dispatcher flow control, adjust the record to read
if (c != null && c.isPreciseDispatcherFlowControl()) {
messagesToRead = Math.min((int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()), readBatchSize);
}

if (!isConsumerWritable()) {
// If the connection is not currently writable, we issue the read request anyway, but for a single
// message. The intent here is to keep use the request as a notification mechanism while avoiding to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ protected void readMoreEntries(Consumer consumer) {
}

int messagesToRead = Math.min(availablePermits, readBatchSize);
// if turn of precise dispatcher flow control, adjust the records to read
if (consumer.isPreciseDispatcherFlowControl()) {
int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
}

// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ protected final void internalSetup() throws Exception {
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
}

protected final void internalSetup(boolean isPreciseDispatcherFlowControl) throws Exception {
init(isPreciseDispatcherFlowControl);
lookupUrl = new URI(brokerUrl.toString());
if (isTcpLookup) {
lookupUrl = new URI(pulsar.getBrokerServiceUrl());
}
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
}

protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
}
Expand Down Expand Up @@ -148,6 +157,26 @@ protected final void init() throws Exception {
startBroker();
}

protected final void init(boolean isPreciseDispatcherFlowControl) throws Exception {
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setPreciseDispatcherFlowControl(isPreciseDispatcherFlowControl);

sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
bkExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
.setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex))
.build());

mockZooKeeper = createMockZooKeeper();
mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);

startBroker();
}

protected final void internalCleanup() throws Exception {
try {
// if init fails, some of these could be null, and if so would throw
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import org.testng.annotations.BeforeMethod;

public class ConsumerPreciseDispatcherFlowControl extends SimpleProducerConsumerTest{

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup(true);
super.producerBaseSetup();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2040,7 +2040,6 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
}

// client should not receive all produced messages and should be blocked due to unack-messages
assertEquals(messages1.size(), receiverQueueSize);
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
return (MessageIdImpl) m.getMessageId();
}).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class ConsumerStats {
/** Number of unacknowledged messages for the consumer. */
public int unackedMessages;

/** Number of average messages per entry for the consumer consumed. */
public int avgMessagesPerEntry;

/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */
public boolean blockedConsumerOnUnackedMsgs;

Expand Down

0 comments on commit a539a7a

Please sign in to comment.