Skip to content

Commit

Permalink
[ISSUE #6230] Optimizes ScheduleMessageService code and logic (#6231)
Browse files Browse the repository at this point in the history
Co-authored-by: loboxu <[email protected]>
  • Loading branch information
socutes and loboxu authored Mar 6, 2023
1 parent cc8823c commit b9bbd52
Showing 1 changed file with 22 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package org.apache.rocketmq.broker.schedule;

import io.opentelemetry.api.common.Attributes;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,8 +29,8 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
import org.apache.rocketmq.common.ConfigManager;
Expand Down Expand Up @@ -96,14 +93,11 @@ public ScheduleMessageService(final BrokerController brokerController) {
this.enableAsyncDeliver = brokerController.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
scheduledPersistService = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryImpl("ScheduleMessageServicePersistThread", true, brokerController.getBrokerConfig()));
scheduledPersistService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
scheduledPersistService.scheduleAtFixedRate(() -> {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000, this.brokerController.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
Expand All @@ -117,9 +111,7 @@ public static int delayLevel2QueueId(final int delayLevel) {
}

public void buildRunningStats(HashMap<String, String> stats) {
Iterator<Map.Entry<Integer, Long>> it = this.offsetTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<Integer, Long> next = it.next();
for (Map.Entry<Integer, Long> next : this.offsetTable.entrySet()) {
int queueId = delayLevel2QueueId(next.getKey());
long delayOffset = next.getValue();
long maxOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, queueId);
Expand Down Expand Up @@ -169,17 +161,13 @@ public void start() {
}
}

this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
this.deliverExecutorService.scheduleAtFixedRate(() -> {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}, 10000, this.brokerController.getMessageStore().getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -208,10 +196,8 @@ public void stop() {
}
}

if (this.deliverPendingTable != null) {
for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
log.warn("deliverPendingTable level: {}, size: {}", i, this.deliverPendingTable.get(i).size());
}
for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
log.warn("deliverPendingTable level: {}, size: {}", i, this.deliverPendingTable.get(i).size());
}

this.persist();
Expand Down Expand Up @@ -378,17 +364,6 @@ private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
return msgInner;
}

public int computeDelayLevel(long timeMillis) {
long intervalMillis = timeMillis - System.currentTimeMillis();
List<Map.Entry<Integer, Long>> sortedLevels = delayLevelTable.entrySet().stream().sorted(Comparator.comparingLong(Map.Entry::getValue)).collect(Collectors.toList());
for (Map.Entry<Integer, Long> entry : sortedLevels) {
if (entry.getValue() > intervalMillis) {
return entry.getKey();
}
}
return sortedLevels.get(sortedLevels.size() - 1).getKey();
}

class DeliverDelayedMessageTimerTask implements Runnable {
private final int delayLevel;
private final long offset;
Expand All @@ -402,7 +377,7 @@ public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
public void run() {
try {
if (isStarted()) {
this.executeOnTimeup();
this.executeOnTimeUp();
}
} catch (Exception e) {
// XXX: warn and notify me
Expand All @@ -411,9 +386,6 @@ public void run() {
}
}

/**
* @return
*/
private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {

long result = deliverTimestamp;
Expand All @@ -426,7 +398,7 @@ private long correctDeliverTimestamp(final long now, final long deliverTimestamp
return result;
}

public void executeOnTimeup() {
public void executeOnTimeUp() {
ConsumeQueueInterface cq =
ScheduleMessageService.this.brokerController.getMessageStore().getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
Expand Down Expand Up @@ -633,7 +605,7 @@ public class PutResultProcess {
private boolean autoResend = false;
private CompletableFuture<PutMessageResult> future;

private volatile int resendCount = 0;
private volatile AtomicInteger resendCount = new AtomicInteger(0);
private volatile ProcessStatus status = ProcessStatus.RUNNING;

public PutResultProcess setTopic(String topic) {
Expand Down Expand Up @@ -712,7 +684,7 @@ public CompletableFuture<PutMessageResult> getFuture() {
return future;
}

public int getResendCount() {
public AtomicInteger getResendCount() {
return resendCount;
}

Expand Down Expand Up @@ -795,7 +767,7 @@ public void doResend() {

// Gradually increase the resend interval.
try {
Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
Thread.sleep(Math.min(this.resendCount.incrementAndGet() * 100, 60 * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -823,13 +795,13 @@ public void doResend() {
public boolean need2Blocked() {
int maxResendNum2Blocked = ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig()
.getScheduleAsyncDeliverMaxResendNum2Blocked();
return this.resendCount > maxResendNum2Blocked;
return this.resendCount.get() > maxResendNum2Blocked;
}

public boolean need2Skip() {
int maxResendNum2Blocked = ScheduleMessageService.this.brokerController.getMessageStore().getMessageStoreConfig()
.getScheduleAsyncDeliverMaxResendNum2Blocked();
return this.resendCount > maxResendNum2Blocked * 2;
return this.resendCount.get() > maxResendNum2Blocked * 2;
}

@Override
Expand Down

0 comments on commit b9bbd52

Please sign in to comment.