Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce precise topic publish rate limiting #7078

Merged
merged 2 commits into from
Jun 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "it uses more CPU to perform frequent check. (Disable publish throttling with value 0)"
)
private int topicPublisherThrottlingTickTimeMillis = 5;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable precise rate limit for topic publish"
)
private boolean preciseTopicPublishRateLimiterEnable = false;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ public abstract class AbstractTopic implements Topic {

protected volatile PublishRateLimiter topicPublishRateLimiter;

protected boolean preciseTopicPublishRateLimitingEnable;

private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();

Expand All @@ -102,6 +104,8 @@ public AbstractTopic(String topic, BrokerService brokerService) {
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
}

Expand All @@ -123,6 +127,18 @@ protected boolean isProducersExceeded() {
return false;
}

public void disableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
}

public void enableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}

protected boolean hasLocalProducers() {
AtomicBoolean foundLocal = new AtomicBoolean(false);
producers.values().forEach(producer -> {
Expand Down Expand Up @@ -373,6 +389,18 @@ public boolean isPublishRateExceeded() {
getBrokerPublishRateLimiter().isPublishRateExceeded();
}

@Override
public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) {
// whether topic publish rate exceed if precise rate limit is enable
return preciseTopicPublishRateLimitingEnable && !this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes);
}

@Override
public boolean isBrokerPublishRateExceeded() {
// whether broker publish rate exceed
return getBrokerPublishRateLimiter().isPublishRateExceeded();
}

public PublishRateLimiter getTopicPublishRateLimiter() {
return topicPublishRateLimiter;
}
Expand All @@ -393,12 +421,21 @@ private void updatePublishDispatcher(Policies policies) {
if (publishRate != null
&& (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling publish rate limiting {} on topic {}", publishRate, this.topic);
// lazy init Publish-rateLimiting monitoring if not initialized yet
this.brokerService.setupTopicPublishRateLimiterMonitor();

// if not precise mode, lazy init Publish-rateLimiting monitoring if not initialized yet
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupTopicPublishRateLimiterMonitor();
}

if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
this.topicPublishRateLimiter = new PublishRateLimiterImpl(policies, clusterName);
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(policies, clusterName,
() -> AbstractTopic.this.enableCnxAutoRead());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(policies, clusterName);
}
} else {
this.topicPublishRateLimiter.update(policies, clusterName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private Channel listenChannel;
private Channel listenChannelTls;

private boolean preciseTopicPublishRateLimitingEnable;
private final long maxMessagePublishBufferBytes;
private final long resumeProducerReadMessagePublishBufferBytes;
private volatile boolean reachMessagePublishBufferThreshold;
Expand All @@ -234,6 +235,7 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.maxMessagePublishBufferBytes = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0 ?
pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L : -1;
this.resumeProducerReadMessagePublishBufferBytes = this.maxMessagePublishBufferBytes / 2;
this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
this.topics = new ConcurrentOpenHashMap<>();
this.replicationClients = new ConcurrentOpenHashMap<>();
Expand Down Expand Up @@ -1586,9 +1588,11 @@ private void updateConfigurationAndRegisterListeners() {
updateBrokerPublisherThrottlingMaxRate());

// add listener to notify topic publish-rate monitoring
registerConfigurationListener("topicPublisherThrottlingTickTimeMillis", (publisherThrottlingTickTimeMillis) -> {
setupTopicPublishRateLimiterMonitor();
});
if (!preciseTopicPublishRateLimitingEnable) {
registerConfigurationListener("topicPublisherThrottlingTickTimeMillis", (publisherThrottlingTickTimeMillis) -> {
setupTopicPublishRateLimiterMonitor();
});
}

// add more listeners here
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;

public interface PublishRateLimiter {

Expand Down Expand Up @@ -66,6 +69,83 @@ public interface PublishRateLimiter {
* @param clusterName
*/
void update(PublishRate maxPublishRate);

/**
* try to acquire permit
* @param numbers
* @param bytes
* */
boolean tryAcquire(int numbers, long bytes);
}

class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
private RateLimiter topicPublishRateLimiterOnMessage;
private RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;

public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
}

@Override
public void checkPublishRate() {
// No-op
}

@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
}

@Override
public boolean resetPublishCount() {
return true;
}

@Override
public boolean isPublishRateExceeded() {
return false;
}


@Override
public void update(Policies policies, String clusterName) {
final PublishRate maxPublishRate = policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
this.update(maxPublishRate);
}
public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage = new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
topicPublishRateLimiterOnMessage = null;
topicPublishRateLimiterOnByte = null;
}
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) &&
(topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
}
}

class PublishRateLimiterImpl implements PublishRateLimiter {
Expand Down Expand Up @@ -153,6 +233,11 @@ public void update(PublishRate maxPublishRate) {
resetPublishCount();
}
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
return false;
}
}

class PublishRateLimiterDisable implements PublishRateLimiter {
Expand Down Expand Up @@ -189,4 +274,11 @@ public void update(Policies policies, String clusterName) {
public void update(PublishRate maxPublishRate) {
// No-op
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
// No-op
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public class ServerCnx extends PulsarHandler {
private final int maxMessageSize;
private boolean preciseDispatcherFlowControl;

private boolean preciseTopicPublishRateLimitingEnable;

// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
private FeatureFlags features;
Expand Down Expand Up @@ -193,6 +195,7 @@ public ServerCnx(PulsarService pulsar) {
this.maxPendingSendRequests = pulsar.getConfiguration().getMaxPendingPublishdRequestsPerConnection();
this.resumeReadsThreshold = maxPendingSendRequests / 2;
this.preciseDispatcherFlowControl = pulsar.getConfiguration().isPreciseDispatcherFlowControl();
this.preciseTopicPublishRateLimitingEnable = pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
}

@Override
Expand Down Expand Up @@ -1206,7 +1209,7 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
}
}

startSendOperation(producer, headersAndPayload.readableBytes());
startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());

// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
Expand Down Expand Up @@ -1792,9 +1795,21 @@ public boolean isWritable() {
return ctx.channel().isWritable();
}

public void startSendOperation(Producer producer, int msgSize) {

public void startSendOperation(Producer producer, int msgSize, int numMessages) {
MSG_PUBLISH_BUFFER_SIZE_UPDATER.getAndAdd(this, msgSize);
boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
boolean isPublishRateExceeded = false;
if (preciseTopicPublishRateLimitingEnable) {
boolean isPreciseTopicPublishRateExceeded = producer.getTopic().isTopicPublishRateExceeded(numMessages, msgSize);
if (isPreciseTopicPublishRateExceeded) {
producer.getTopic().disableCnxAutoRead();
return;
}
isPublishRateExceeded = producer.getTopic().isBrokerPublishRateExceeded();
} else {
isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
}

if (++pendingSendRequest == maxPendingSendRequests || isPublishRateExceeded) {
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
// client connection, possibly shared between multiple producers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

boolean isPublishRateExceeded();

boolean isTopicPublishRateExceeded(int msgSize, int numMessages);

boolean isBrokerPublishRateExceeded();

void disableCnxAutoRead();

void enableCnxAutoRead();

CompletableFuture<Void> onPoliciesUpdate(Policies data);

boolean isBacklogQuotaExceeded(String producerName);
Expand Down
Loading