Skip to content

Commit

Permalink
[Broker] Fix set-publish-rate when using preciseTopicPublishRateLimit…
Browse files Browse the repository at this point in the history
…erEnable=true (#10384)

### Motivation

When using `preciseTopicPublishRateLimiterEnable=true` (introduced by #7078) setting for rate limiting, there are various issues:
- updating the limits doesn't set either boundary when changing the limits from a bounded limit to unbounded.
- each topic will create a scheduler thread for each limiter instance 
- each topic will never release the scheduler thread when the topic gets unloaded / closed
- updating the limits didn't close the scheduler thread related to the replaced limiter instance

### Modifications

- Fix updating of the limits by cleaning up the previous limiter instances before creating new limiter instances
- Use `brokerService.pulsar().getExecutor()` as the scheduler for the rate limiter instances
- Add resource cleanup hooks for topic closing (unload)

### Open issue 

The existing code has a difference in passing the `rateLimitFunction`:
https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java#L80-L86
It's passed to the `topicPublishRateLimiterOnMessage`, but not to `topicPublishRateLimiterOnByte` . It is unclear whether this is intentional.
The `rateLimitFunction` is `() -> this.enableCnxAutoRead()`
https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L913
(This also raises a question whether rate limiting works consistently when multiple topics share the same connection.)
  • Loading branch information
lhotari authored Aug 3, 2021
1 parent e6909c6 commit ded806f
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,7 @@ protected void updatePublishDispatcher(PublishRate publishRate) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
() -> this.enableCnxAutoRead());
() -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
Expand All @@ -27,30 +27,37 @@
public class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
private RateLimiter topicPublishRateLimiterOnMessage;
private RateLimiter topicPublishRateLimiterOnByte;
private volatile RateLimiter topicPublishRateLimiterOnMessage;
private volatile RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;
private final ScheduledExecutorService scheduledExecutorService;

public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
this.scheduledExecutorService = null;
}

public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) {
this(publishRate, rateLimitFunction, null);
}

public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction,
ScheduledExecutorService scheduledExecutorService) {
this.rateLimitFunction = rateLimitFunction;
update(publishRate);
this.scheduledExecutorService = scheduledExecutorService;
}

@Override
public void checkPublishRate() {
// No-op
// No-op
}

@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
// No-op
}

@Override
Expand All @@ -62,10 +69,15 @@ public boolean resetPublishCount() {
public boolean isPublishRateExceeded() {
return false;
}

// If all rate limiters are not exceeded, re-enable auto read from socket.
private void tryReleaseConnectionThrottle() {
if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
|| (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
if ((currentTopicPublishRateLimiterOnMessage != null
&& currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
|| (currentTopicPublishRateLimiterOnByte != null
&& currentTopicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
return;
}
this.rateLimitFunction.apply();
Expand All @@ -78,34 +90,73 @@ public void update(Policies policies, String clusterName) {
: null;
this.update(maxPublishRate);
}

public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
replaceLimiters(() -> {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0
|| maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(publishMaxMessageRate)
.rateLimitFunction(this::tryReleaseConnectionThrottle)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(publishMaxByteRate)
.rateLimitFunction(this::tryReleaseConnectionThrottle)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte =
new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
topicPublishRateLimiterOnMessage = null;
topicPublishRateLimiterOnByte = null;
}
});
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers))
&& (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
return (currentTopicPublishRateLimiterOnMessage == null
|| currentTopicPublishRateLimiterOnMessage.tryAcquire(numbers))
&& (currentTopicPublishRateLimiterOnByte == null
|| currentTopicPublishRateLimiterOnByte.tryAcquire(bytes));
}

@Override
public void close() throws Exception {
rateLimitFunction.apply();
replaceLimiters(null);
}

private void replaceLimiters(Runnable updater) {
RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
topicPublishRateLimiterOnMessage = null;
RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
topicPublishRateLimiterOnByte = null;
try {
if (updater != null) {
updater.run();
}
} finally {
// Close previous limiters to prevent resource leakages.
// Delay closing of previous limiters after new ones are in place so that updating the limiter
// doesn't cause unavailability.
if (previousTopicPublishRateLimiterOnMessage != null) {
previousTopicPublishRateLimiterOnMessage.close();
}
if (previousTopicPublishRateLimiterOnByte != null) {
previousTopicPublishRateLimiterOnByte.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;

public interface PublishRateLimiter {
public interface PublishRateLimiter extends AutoCloseable {

PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,8 @@ public boolean tryAcquire(int numbers, long bytes) {
return true;
}

@Override
public void close() throws Exception {
// No-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,9 @@ public void update(PublishRate maxPublishRate) {
public boolean tryAcquire(int numbers, long bytes) {
return false;
}

@Override
public void close() throws Exception {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,13 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,13 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier,
new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS, null));
ratePeriod, TimeUnit.SECONDS));
} else {
this.subscribeRateLimiter.get(consumerIdentifier)
.setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.annotations.Test;

public class PrecisPublishLimiterTest {

@Test
void shouldResetMsgLimitAfterUpdate() {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
});
precisPublishLimiter.update(new PublishRate(1, 1));
assertFalse(precisPublishLimiter.tryAcquire(99, 99));
precisPublishLimiter.update(new PublishRate(-1, 100));
assertTrue(precisPublishLimiter.tryAcquire(99, 99));
}

@Test
void shouldResetBytesLimitAfterUpdate() {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
});
precisPublishLimiter.update(new PublishRate(1, 1));
assertFalse(precisPublishLimiter.tryAcquire(99, 99));
precisPublishLimiter.update(new PublishRate(100, -1));
assertTrue(precisPublishLimiter.tryAcquire(99, 99));
}

@Test
void shouldCloseResources() throws Exception {
for (int i = 0; i < 20000; i++) {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> {
});
precisPublishLimiter.tryAcquire(99, 99);
precisPublishLimiter.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Builder;

/**
* A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
Expand All @@ -48,7 +49,6 @@
* </ul>
*/
public class RateLimiter implements AutoCloseable{

private final ScheduledExecutorService executorService;
private long rateTime;
private TimeUnit timeUnit;
Expand All @@ -63,7 +63,7 @@ public class RateLimiter implements AutoCloseable{
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
this(null, permits, rateTime, timeUnit);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
Expand All @@ -82,13 +82,26 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter,
null);
}

@Builder
RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter,
RateLimitFunction rateLimitFunction) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

Expand All @@ -98,8 +111,8 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f
this.permitUpdater = permitUpdater;
this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter;

if (service != null) {
this.executorService = service;
if (scheduledExecutorService != null) {
this.executorService = scheduledExecutorService;
this.externalExecutor = true;
} else {
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Expand All @@ -109,6 +122,14 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f
this.externalExecutor = false;
}

this.rateLimitFunction = rateLimitFunction;

}

// default values for Lombok generated builder class
public static class RateLimiterBuilder {
private long rateTime = 1;
private TimeUnit timeUnit = TimeUnit.SECONDS;
}

@Override
Expand Down
Loading

0 comments on commit ded806f

Please sign in to comment.