Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix consumer fetch message number maps to read entry number bug and expose avgMessagesPerEntry metric #6719

Merged
merged 6 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ dispatcherMinReadBatchSize=1
# Max number of entries to dispatch for a shared subscription. By default it is 20 entries.
dispatcherMaxRoundRobinBatchSize=20

# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ dispatchThrottlingRateRelativeToPublishRate=false
# backlog.
dispatchThrottlingOnNonBacklogConsumerEnabled=true

# Precise dispathcer flow control according to history message number of each entry
preciseDispatcherFlowControl=false

# Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic
maxConcurrentLookupRequest=50000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int dispatcherMaxRoundRobinBatchSize = 20;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
doc = "Precise dispatcher flow control according to history message number of each entry"
)
private boolean preciseDispatcherFlowControl = false;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import io.netty.buffer.ByteBuf;

import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

Expand All @@ -48,6 +49,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi
.newUpdater(AbstractDispatcherMultipleConsumers.class, "isClosed");
private volatile int isClosed = FALSE;

private Random random = new Random(42);

protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
super(subscription);
}
Expand Down Expand Up @@ -142,6 +145,21 @@ public Consumer getNextConsumer() {
return null;
}

/**
* Get random consumer from consumerList.
*
* @return null if no consumer available, else return random consumer from consumerList
*/
public Consumer getRandomConsumer() {
if (consumerList.isEmpty() || IS_CLOSED_UPDATER.get(this) == TRUE) {
// abort read if no consumers are connected of if disconnect is initiated
return null;
}

return consumerList.get(random.nextInt(consumerList.size()));
}


/**
* Finds index of first available consumer which has higher priority then given targetPriority
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ public class Consumer {

private final PulsarApi.KeySharedMeta keySharedMeta;

/**
* It starts keep tracking the average messages per entry.
* The initial value is 1000, when new value comes, it will update with
* avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value.
*/
private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry");
private volatile int avgMessagesPerEntry = 1000;

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;

public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId,
int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId,
Expand All @@ -136,9 +148,12 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.msgOutCounter = new LongAdder();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl();

PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
MESSAGE_PERMITS_UPDATER.set(this, 0);
UNACKED_MESSAGES_UPDATER.set(this, 0);
AVG_MESSAGES_PER_ENTRY.set(this, 1000);

this.metadata = metadata != null ? metadata : Collections.emptyMap();

Expand Down Expand Up @@ -223,6 +238,12 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
}
}

// calculate avg message per entry
int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this);
tmpAvgMessagesPerEntry = (int) Math.round(tmpAvgMessagesPerEntry * avgPercent +
(1 - avgPercent) * totalMessages / entries.size());
AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry);

// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
MESSAGE_PERMITS_UPDATER.addAndGet(this, -totalMessages);
incrementUnackedMessages(totalMessages);
Expand Down Expand Up @@ -432,6 +453,10 @@ public int getAvailablePermits() {
return MESSAGE_PERMITS_UPDATER.get(this);
}

public int getAvgMessagesPerEntry() {
return AVG_MESSAGES_PER_ENTRY.get(this);
}

public boolean isBlocked() {
return blockedConsumerOnUnackedMsgs;
}
Expand Down Expand Up @@ -471,6 +496,7 @@ public ConsumerStats getStats() {
stats.availablePermits = getAvailablePermits();
stats.unackedMessages = unackedMessages;
stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs;
stats.avgMessagesPerEntry = getAvgMessagesPerEntry();
return stats;
}

Expand Down Expand Up @@ -653,5 +679,9 @@ private void clearUnAckedMsgs() {
subscription.addUnAckedMessages(-unaAckedMsgs);
}

public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public class ServerCnx extends PulsarHandler {
private final boolean schemaValidationEnforced;
private String authMethod = "none";
private final int maxMessageSize;
private boolean preciseDispatcherFlowControl;

// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
Expand Down Expand Up @@ -191,6 +192,7 @@ public ServerCnx(PulsarService pulsar) {
this.maxMessageSize = pulsar.getConfiguration().getMaxMessageSize();
this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishdRequestsPerConnection();
this.resumeReadsThreshold = maxPendingSendRequests / 2;
this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl();
}

@Override
Expand Down Expand Up @@ -1952,4 +1954,8 @@ void setMessagePublishBufferSize(long bufferSize) {
void setAutoReadDisabledRateLimiting(boolean isLimiting) {
this.autoReadDisabledRateLimiting = isLimiting;
}

public boolean isPreciseDispatcherFlowControl() {
return preciseDispatcherFlowControl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static void writeConsumerStats(StatsOutputStream statsStream, PulsarApi.C
statsStream.writePair("msgRateOut", stats.msgRateOut);
statsStream.writePair("msgThroughputOut", stats.msgThroughputOut);
statsStream.writePair("msgRateRedeliver", stats.msgRateRedeliver);
statsStream.writePair("avgMessagesPerEntry", stats.avgMessagesPerEntry);

if (Subscription.isIndividualAckMode(subType)) {
statsStream.writePair("unackedMessages", stats.unackedMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public synchronized void consumerFlow(Consumer consumer, int additionalNumberOfM
}

totalAvailablePermits += additionalNumberOfMessages;

if (log.isDebugEnabled()) {
log.debug("[{}-{}] Trigger new read after receiving flow control message with permits {}", name, consumer,
totalAvailablePermits);
Expand All @@ -256,6 +257,12 @@ public void readMoreEntries() {
if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);

Consumer c = getRandomConsumer();
// if turn on precise dispatcher flow control, adjust the record to read
if (c != null && c.isPreciseDispatcherFlowControl()) {
messagesToRead = Math.min((int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()), readBatchSize);
}

if (!isConsumerWritable()) {
// If the connection is not currently writable, we issue the read request anyway, but for a single
// message. The intent here is to keep use the request as a notification mechanism while avoiding to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ protected void readMoreEntries(Consumer consumer) {
}

int messagesToRead = Math.min(availablePermits, readBatchSize);
// if turn of precise dispatcher flow control, adjust the records to read
if (consumer.isPreciseDispatcherFlowControl()) {
int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
}

// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ protected final void internalSetup() throws Exception {
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
}

protected final void internalSetup(boolean isPreciseDispatcherFlowControl) throws Exception {
init(isPreciseDispatcherFlowControl);
lookupUrl = new URI(brokerUrl.toString());
if (isTcpLookup) {
lookupUrl = new URI(pulsar.getBrokerServiceUrl());
}
pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
}

protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
return PulsarClient.builder().serviceUrl(url).statsInterval(intervalInSecs, TimeUnit.SECONDS).build();
}
Expand Down Expand Up @@ -149,6 +158,26 @@ protected final void init() throws Exception {
startBroker();
}

protected final void init(boolean isPreciseDispatcherFlowControl) throws Exception {
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setPreciseDispatcherFlowControl(isPreciseDispatcherFlowControl);

sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
bkExecutor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
.setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex))
.build());

mockZooKeeper = createMockZooKeeper();
mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);

startBroker();
}

protected final void internalCleanup() throws Exception {
try {
// if init fails, some of these could be null, and if so would throw
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import org.testng.annotations.BeforeMethod;

public class ConsumerPreciseDispatcherFlowControl extends SimpleProducerConsumerTest{

@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup(true);
super.producerBaseSetup();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2040,7 +2040,6 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
}

// client should not receive all produced messages and should be blocked due to unack-messages
assertEquals(messages1.size(), receiverQueueSize);
Set<MessageIdImpl> redeliveryMessages = messages1.stream().map(m -> {
return (MessageIdImpl) m.getMessageId();
}).collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ public class ConsumerStats {
/** Number of unacknowledged messages for the consumer. */
public int unackedMessages;

/** Number of average messages per entry for the consumer consumed. */
public int avgMessagesPerEntry;

/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages. */
public boolean blockedConsumerOnUnackedMsgs;

Expand Down