Skip to content

Commit

Permalink
[fix][broker] Fix dispatch duplicated messages with Exclusive mode. (
Browse files Browse the repository at this point in the history
…apache#17237)

(cherry picked from commit 0517423)
(cherry picked from commit 390a4ed)
  • Loading branch information
mattisonchao authored and nicoloboschi committed Dec 7, 2022
1 parent 2a82a1f commit 03074ba
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -58,6 +59,7 @@
public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer
implements Dispatcher, ReadEntriesCallback {

private final AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
protected final PersistentTopic topic;
protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
Expand Down Expand Up @@ -227,14 +229,7 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
} else {
log.debug(
"[{}-{}] Ignoring write future complete."
+ " consumerAvailable={} havePendingRead={}",
name, newConsumer, newConsumer != null, havePendingRead);
}
readMoreEntries(newConsumer);
}
}));
}
Expand Down Expand Up @@ -320,25 +315,40 @@ protected void readMoreEntries(Consumer consumer) {
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
}
return;
}
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
}
return;
}

if (consumer.getAvailablePermits() > 0) {
Pair<Integer, Long> calculateResult = calculateToRead(consumer);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();
synchronized (this) {
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
}
return;
}

if (-1 == messagesToRead || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate.
return;
}
Pair<Integer, Long> calculateResult = calculateToRead(consumer);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();

// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
if (-1 == messagesToRead || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate.
return;
}

synchronized (this) {
// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
Expand All @@ -359,19 +369,16 @@ protected void readMoreEntries(Consumer consumer) {

@Override
protected void reScheduleRead() {
topic.getBrokerService().executor().schedule(() -> {
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer != null && !havePendingRead) {
readMoreEntries(currentConsumer);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
+ " havePendingRead {}",
topic.getName(), currentConsumer, havePendingRead);
}
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
}
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);

topic.getBrokerService().executor().schedule(() -> {
isRescheduleReadInProgress.set(false);
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
readMoreEntries(currentConsumer);
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
}

protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,31 +179,49 @@ protected void readMoreEntries(Consumer consumer) {
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", topic.getName());
}
return;
}
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
}
return;
}

if (!havePendingRead && consumer.getAvailablePermits() > 0) {
Pair<Integer, Long> calculateResult = calculateToRead(consumer);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();
if (consumer.getAvailablePermits() > 0) {
synchronized (this) {
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read for the topic, Due to we have pending read.", topic.getName());
}
return;
}

Pair<Integer, Long> calculateResult = calculateToRead(consumer);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();


if (-1 == messagesToRead || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate.
return;
}
if (-1 == messagesToRead || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate.
return;
}

// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
havePendingRead = true;
// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
havePendingRead = true;

if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
this, consumer);
} else {
streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
this, consumer);
} else {
streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
}
}
} else {
if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,92 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
log.info("-- Exiting {} test --", methodName);
}

@Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount = 15)
private void testMessageNotDuplicated(SubscriptionType subscription) throws Exception {
int brokerRate = 1000;
int topicRate = 5000;
int subRate = 10000;
int expectRate = 1000;
final String namespace = "my-property/throttling_ns_non_dup";
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;

DispatchRate subscriptionDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(subRate)
.ratePeriodInSecond(1)
.build();
DispatchRate topicDispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(-1)
.dispatchThrottlingRateInByte(topicRate)
.ratePeriodInSecond(1)
.build();
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);

final int numProducedMessages = 30;
final CountDownLatch latch = new CountDownLatch(numProducedMessages);
final AtomicInteger totalReceived = new AtomicInteger(0);
// enable throttling for nonBacklog consumers
conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(10)
.subscriptionType(subscription).messageListener((c1, msg) -> {
Assert.assertNotNull(msg, "Message cannot be null");
String receivedMessage = new String(msg.getData());
log.debug("Received message [{}] in the listener", receivedMessage);
totalReceived.incrementAndGet();
latch.countDown();
}).subscribe();

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();

DispatchRateLimiter subRateLimiter = null;
Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
subRateLimiter = subDispatcher.getRateLimiter().get();
} else {
Assert.fail("Should only have PersistentDispatcher in this test");
}
final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
Assert.assertTrue(brokerDispatchRateLimiter != null
&& brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
Assert.assertTrue(topicDispatchRateLimiter != null
&& topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
Assert.assertTrue(subDispatchRateLimiter != null
&& subDispatchRateLimiter.getDispatchRateOnByte() > 0);
});

Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
.getDispatchThrottlingRateInByte(), subRate);
Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
.getDispatchThrottlingRateInByte(), topicRate);

for (int i = 0; i < numProducedMessages; i++) {
producer.send(new byte[expectRate / 10]);
}

latch.await();
// Wait 2000 milli sec to check if we can get more than 30 messages.
Thread.sleep(2000);
// If this assertion failed, please alert we may have some regression cause message dispatch was duplicated.
Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);

consumer.close();
producer.close();
admin.topics().delete(topicName, true);
admin.namespaces().deleteNamespace(namespace);
}

/**
* verify rate-limiting should throttle message-dispatching based on byte-rate
*
Expand Down

0 comments on commit 03074ba

Please sign in to comment.