Skip to content

Commit

Permalink
[ISSUE #6797]Support batch ack when reput buffer ak to store in PopBu…
Browse files Browse the repository at this point in the history
…fferMergeService (#6798)

* add back for PopReviveService

* add batch ack for PopReviveService
  • Loading branch information
DongyuanPan authored May 24, 2023
1 parent eef581b commit 985319b
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import com.alibaba.fastjson.JSON;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
Expand All @@ -39,6 +41,7 @@
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;

public class PopBufferMergeService extends ServiceThread {
Expand All @@ -59,6 +62,7 @@ public class PopBufferMergeService extends ServiceThread {
private final int countOfSecond1 = (int) (1000 / interval);
private final int countOfSecond30 = (int) (30 * 1000 / interval);

private final List<Byte> batchAckIndexList = new ArrayList(32);
private volatile boolean master = false;

public PopBufferMergeService(BrokerController brokerController, PopMessageProcessor popMessageProcessor) {
Expand Down Expand Up @@ -268,13 +272,36 @@ private void scan() {
continue;
}

for (byte i = 0; i < point.getNum(); i++) {
// reput buffer ak to store
if (DataConverter.getBit(pointWrapper.getBits().get(), i)
&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
if (putAckToStore(pointWrapper, i)) {
count++;
markBitCAS(pointWrapper.getToStoreBits(), i);
if (brokerController.getBrokerConfig().isEnablePopBatchAck()) {
List<Byte> indexList = this.batchAckIndexList;
try {
for (byte i = 0; i < point.getNum(); i++) {
// reput buffer ak to store
if (DataConverter.getBit(pointWrapper.getBits().get(), i)
&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
indexList.add(i);
}
}
if (indexList.size() > 0) {
if (putBatchAckToStore(pointWrapper, indexList)) {
count += indexList.size();
for (Byte i : indexList) {
markBitCAS(pointWrapper.getToStoreBits(), i);
}
}
}
} finally {
indexList.clear();
}
} else {
for (byte i = 0; i < point.getNum(); i++) {
// reput buffer ak to store
if (DataConverter.getBit(pointWrapper.getBits().get(), i)
&& !DataConverter.getBit(pointWrapper.getToStoreBits().get(), i)) {
if (putAckToStore(pointWrapper, i)) {
count++;
markBitCAS(pointWrapper.getToStoreBits(), i);
}
}
}
}
Expand Down Expand Up @@ -606,6 +633,45 @@ private boolean putAckToStore(final PopCheckPointWrapper pointWrapper, byte msgI
return true;
}

private boolean putBatchAckToStore(final PopCheckPointWrapper pointWrapper, final List<Byte> msgIndexList) {
PopCheckPoint point = pointWrapper.getCk();
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
final BatchAckMsg batchAckMsg = new BatchAckMsg();

for (Byte msgIndex : msgIndexList) {
batchAckMsg.getAckOffsetList().add(point.ackOffsetByIndex(msgIndex));
}
batchAckMsg.setStartOffset(point.getStartOffset());
batchAckMsg.setConsumerGroup(point.getCId());
batchAckMsg.setTopic(point.getTopic());
batchAckMsg.setQueueId(point.getQueueId());
batchAckMsg.setPopTime(point.getPopTime());
msgInner.setTopic(popMessageProcessor.reviveTopic);
msgInner.setBody(JSON.toJSONString(batchAckMsg).getBytes(DataConverter.charset));
msgInner.setQueueId(pointWrapper.getReviveQueueId());
msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
msgInner.setBornTimestamp(System.currentTimeMillis());
msgInner.setBornHost(brokerController.getStoreHost());
msgInner.setStoreHost(brokerController.getStoreHost());
msgInner.setDeliverTimeMs(point.getReviveTime());
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genBatchAckUniqueId(batchAckMsg));

msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
&& putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("[PopBuffer]put batch ack to store fail: {}, {}, {}", pointWrapper, batchAckMsg, putMessageResult);
return false;
}
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("[PopBuffer]put batch ack to store ok: {}, {}, {}", pointWrapper, batchAckMsg, putMessageResult);
}

return true;
}

private boolean cancelCkTimer(final PopCheckPointWrapper pointWrapper) {
// not stored, no need cancel
if (pointWrapper.getReviveQueueOffset() < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
Expand Down Expand Up @@ -136,6 +137,15 @@ public static String genAckUniqueId(AckMsg ackMsg) {
+ PopAckConstants.SPLIT + PopAckConstants.ACK_TAG;
}

public static String genBatchAckUniqueId(BatchAckMsg batchAckMsg) {
return batchAckMsg.getTopic()
+ PopAckConstants.SPLIT + batchAckMsg.getQueueId()
+ PopAckConstants.SPLIT + batchAckMsg.getAckOffsetList().toString()
+ PopAckConstants.SPLIT + batchAckMsg.getConsumerGroup()
+ PopAckConstants.SPLIT + batchAckMsg.getPopTime()
+ PopAckConstants.SPLIT + PopAckConstants.BATCH_ACK_TAG;
}

public static String genCkUniqueId(PopCheckPoint ck) {
return ck.getTopic()
+ PopAckConstants.SPLIT + ck.getQueueId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.pop.AckMsg;
import org.apache.rocketmq.store.pop.BatchAckMsg;
import org.apache.rocketmq.store.pop.PopCheckPoint;

import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
Expand Down Expand Up @@ -382,18 +383,8 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
continue;
}
long ackWaitTime = System.currentTimeMillis() - messageExt.getDeliverTimeMs();
long reviveAckWaitMs = brokerController.getBrokerConfig().getReviveAckWaitMs();
if (ackWaitTime > reviveAckWaitMs) {
// will use the reviveOffset of popCheckPoint to commit offset in mergeAndRevive
PopCheckPoint mockPoint = createMockCkForAck(ackMsg, messageExt.getQueueOffset());
POP_LOGGER.warn(
"ack wait for {}ms cannot find ck, skip this ack. mergeKey:{}, ack:{}, mockCk:{}",
reviveAckWaitMs, mergeKey, ackMsg, mockPoint);
mockPointMap.put(mergeKey, mockPoint);
if (firstRt == 0) {
firstRt = mockPoint.getReviveTime();
}
if (mockCkForAck(messageExt, ackMsg, mergeKey, mockPointMap) && firstRt == 0) {
firstRt = mockPointMap.get(mergeKey).getReviveTime();
}
} else {
int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
Expand All @@ -403,6 +394,34 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);
}
}
} else if (PopAckConstants.BATCH_ACK_TAG.equals(messageExt.getTags())) {
String raw = new String(messageExt.getBody(), DataConverter.charset);
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("reviveQueueId={}, find batch ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
}

BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId);
String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime();
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
continue;
}
if (mockCkForAck(messageExt, bAckMsg, mergeKey, mockPointMap) && firstRt == 0) {
firstRt = mockPointMap.get(mergeKey).getReviveTime();
}
} else {
List<Long> ackOffsetList = bAckMsg.getAckOffsetList();
for (Long ackOffset : ackOffsetList) {
int indexOfAck = point.indexOfAck(ackOffset);
if (indexOfAck > -1) {
point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
} else {
POP_LOGGER.error("invalid batch ack index, {}, {}", bAckMsg, point);
}
}
}
}
long deliverTime = messageExt.getDeliverTimeMs();
if (deliverTime > endTime) {
Expand All @@ -415,6 +434,21 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
consumeReviveObj.endTime = endTime;
}

private boolean mockCkForAck(MessageExt messageExt, AckMsg ackMsg, String mergeKey, HashMap<String, PopCheckPoint> mockPointMap) {
long ackWaitTime = System.currentTimeMillis() - messageExt.getDeliverTimeMs();
long reviveAckWaitMs = brokerController.getBrokerConfig().getReviveAckWaitMs();
if (ackWaitTime > reviveAckWaitMs) {
// will use the reviveOffset of popCheckPoint to commit offset in mergeAndRevive
PopCheckPoint mockPoint = createMockCkForAck(ackMsg, messageExt.getQueueOffset());
POP_LOGGER.warn(
"ack wait for {}ms cannot find ck, skip this ack. mergeKey:{}, ack:{}, mockCk:{}",
reviveAckWaitMs, mergeKey, ackMsg, mockPoint);
mockPointMap.put(mergeKey, mockPoint);
return true;
}
return false;
}

private PopCheckPoint createMockCkForAck(AckMsg ackMsg, long reviveOffset) {
PopCheckPoint point = new PopCheckPoint();
point.setStartOffset(ackMsg.getStartOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ public class BrokerConfig extends BrokerIdentity {
private int popCkStayBufferTimeOut = 3 * 1000;
private int popCkMaxBufferSize = 200000;
private int popCkOffsetMaxQueueSize = 20000;
private boolean enablePopBatchAck = false;
private boolean enableNotifyAfterPopOrderLockRelease = true;

private boolean realTimeNotifyConsumerChange = true;
Expand Down Expand Up @@ -499,6 +500,14 @@ public void setPopCkOffsetMaxQueueSize(int popCkOffsetMaxQueueSize) {
this.popCkOffsetMaxQueueSize = popCkOffsetMaxQueueSize;
}

public boolean isEnablePopBatchAck() {
return enablePopBatchAck;
}

public void setEnablePopBatchAck(boolean enablePopBatchAck) {
this.enablePopBatchAck = enablePopBatchAck;
}

public boolean isEnableSkipLongAwaitingAck() {
return enableSkipLongAwaitingAck;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class PopAckConstants {
public static final String REVIVE_TOPIC = TopicValidator.SYSTEM_TOPIC_PREFIX + "REVIVE_LOG_";
public static final String CK_TAG = "ck";
public static final String ACK_TAG = "ack";
public static final String BATCH_ACK_TAG = "bAck";
public static final String SPLIT = "@";

/**
Expand Down
50 changes: 50 additions & 0 deletions store/src/main/java/org/apache/rocketmq/store/pop/BatchAckMsg.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store.pop;

import com.alibaba.fastjson.annotation.JSONField;
import java.util.ArrayList;
import java.util.List;


public class BatchAckMsg extends AckMsg {
@JSONField(name = "aol", alternateNames = {"ackOffsetList"})
private List<Long> ackOffsetList = new ArrayList(32);


public List<Long> getAckOffsetList() {
return ackOffsetList;
}

public void setAckOffsetList(List<Long> ackOffsetList) {
this.ackOffsetList = ackOffsetList;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("BatchAckMsg{");
sb.append("ackOffsetList=").append(ackOffsetList);
sb.append(", startOffset=").append(getStartOffset());
sb.append(", consumerGroup='").append(getConsumerGroup()).append('\'');
sb.append(", topic='").append(getTopic()).append('\'');
sb.append(", queueId=").append(getQueueId());
sb.append(", popTime=").append(getPopTime());
sb.append(", brokerName=").append(getBrokerName());
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.store.pop;

import com.alibaba.fastjson.JSON;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

public class BatchAckMsgTest {

@Test
public void testSerializeAndDeSerialize() {
String longString = "{\"ackOffsetList\":[100, 101],\"consumerGroup\":\"group\"," +
"\"popTime\":1679454922000,\"queueId\":3,\"startOffset\":200,\"topic\":\"topic\"}";

BatchAckMsg batchAckMsg = new BatchAckMsg();
List<Long> aol = new ArrayList<>(32);
aol.add(100L);
aol.add(101L);

batchAckMsg.setAckOffsetList(aol);
batchAckMsg.setStartOffset(200L);
batchAckMsg.setConsumerGroup("group");
batchAckMsg.setTopic("topic");
batchAckMsg.setQueueId(3);
batchAckMsg.setPopTime(1679454922000L);

String jsonString = JSON.toJSONString(batchAckMsg);
BatchAckMsg batchAckMsg1 = JSON.parseObject(jsonString, BatchAckMsg.class);
BatchAckMsg batchAckMsg2 = JSON.parseObject(longString, BatchAckMsg.class);

Assert.assertEquals(batchAckMsg1.getAckOffsetList(), batchAckMsg2.getAckOffsetList());
Assert.assertEquals(batchAckMsg1.getTopic(), batchAckMsg2.getTopic());
Assert.assertEquals(batchAckMsg1.getConsumerGroup(), batchAckMsg2.getConsumerGroup());
Assert.assertEquals(batchAckMsg1.getQueueId(), batchAckMsg2.getQueueId());
Assert.assertEquals(batchAckMsg1.getStartOffset(), batchAckMsg2.getStartOffset());
Assert.assertEquals(batchAckMsg1.getPopTime(), batchAckMsg2.getPopTime());
}
}

0 comments on commit 985319b

Please sign in to comment.