Skip to content

Commit

Permalink
Merge pull request alibaba#501 from suiyuzeng/issue_recover
Browse files Browse the repository at this point in the history
[ISSUE alibaba#467] fix Message missed after recovering from abnormal shutdown
  • Loading branch information
duhengforever authored Nov 1, 2018
2 parents 3dabb6c + d3d4225 commit edf9703
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 36 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ It offers a variety of features:
* Docs: <https://rocketmq.apache.org/docs/quick-start/>
* Issues: <https://github.com/apache/rocketmq/issues>
* Ask: <https://stackoverflow.com/questions/tagged/rocketmq>
* Slack: <https://rocketmq-community.slack.com/>
* Slack: <https://rocketmq-invite-automation.herokuapp.com/>


----------
Expand Down
71 changes: 40 additions & 31 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public SelectMappedBufferResult getData(final long offset, final boolean returnF
/**
* When the normal exit, data recovery, all memory data have been flush
*/
public void recoverNormally() {
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
Expand Down Expand Up @@ -206,6 +206,12 @@ else if (!dispatchRequest.isSuccess()) {
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);

// Clear ConsumeQueue redundant data
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
}

Expand Down Expand Up @@ -390,7 +396,7 @@ public void setConfirmOffset(long phyOffset) {
this.confirmOffset = phyOffset;
}

public void recoverAbnormally() {
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
// recover by the minimum time stamp
boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
Expand Down Expand Up @@ -418,41 +424,41 @@ public void recoverAbnormally() {
DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
int size = dispatchRequest.getMsgSize();

// Normal data
if (size > 0) {
mappedFileOffset += size;
if (dispatchRequest.isSuccess()) {
// Normal data
if (size > 0) {
mappedFileOffset += size;

if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
} else {
this.defaultMessageStore.doDispatch(dispatchRequest);
}
}
// Intermediate file read error
else if (size == -1) {
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
} else {
log.info("recover physics file end, " + mappedFile.getFileName());
break;
}
// Come the end of the file, switch to the next file
// Since the return 0 representatives met last hole, this can
// not be included in truncate offset
else if (size == 0) {
index++;
if (index >= mappedFiles.size()) {
// The current branch under normal circumstances should
// not happen
log.info("recover physics file over, last mapped file " + mappedFile.getFileName());
break;
} else {
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next physics file, " + mappedFile.getFileName());
}
}
}

processOffset += mappedFileOffset;
Expand All @@ -461,7 +467,10 @@ else if (size == 0) {
this.mappedFileQueue.truncateDirtyFiles(processOffset);

// Clear ConsumeQueue redundant data
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
if (maxPhyOffsetOfConsumeQueue >= processOffset) {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
}
}
// Commitlog case files are deleted
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1276,12 +1276,12 @@ private boolean loadConsumeQueue() {
}

private void recover(final boolean lastExitOK) {
this.recoverConsumeQueue();
long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();

if (lastExitOK) {
this.commitLog.recoverNormally();
this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
} else {
this.commitLog.recoverAbnormally();
this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
}

this.recoverTopicQueueTable();
Expand All @@ -1306,12 +1306,18 @@ private void putConsumeQueue(final String topic, final int queueId, final Consum
}
}

private void recoverConsumeQueue() {
private long recoverConsumeQueue() {
long maxPhysicOffset = -1;
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
for (ConsumeQueue logic : maps.values()) {
logic.recover();
if (logic.getMaxPhysicOffset() > maxPhysicOffset) {
maxPhysicOffset = logic.getMaxPhysicOffset();
}
}
}

return maxPhysicOffset;
}

private void recoverTopicQueueTable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.rocketmq.store;

import java.io.File;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -29,6 +32,7 @@
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
Expand Down Expand Up @@ -171,6 +175,120 @@ public void testPullSize() throws Exception {
assertThat(getMessageResult45.getMessageBufferList().size()).isEqualTo(10);
}

@Test
public void testRecover() throws Exception {
String topic = "recoverTopic";
MessageBody = StoreMessage.getBytes();
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}

Thread.sleep(100);//wait for build consumer queue
long maxPhyOffset = messageStore.getMaxPhyOffset();
long maxCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);

//1.just reboot
messageStore.shutdown();
messageStore = buildMessageStore();
boolean load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(maxPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(maxCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));

//2.damage commitlog and reboot normal
for (int i = 0; i < 100; i++) {
MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
long secondLastPhyOffset = messageStore.getMaxPhyOffset();
long secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);

MessageExtBrokerInner messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);

messageStore.shutdown();

//damage last message
damageCommitlog(secondLastPhyOffset);

//reboot
messageStore = buildMessageStore();
load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));

//3.damage commitlog and reboot abnormal
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
Thread.sleep(100);
secondLastPhyOffset = messageStore.getMaxPhyOffset();
secondLastCqOffset = messageStore.getMaxOffsetInQueue(topic, 0);

messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
messageStore.shutdown();

//damage last message
damageCommitlog(secondLastPhyOffset);
//add abort file
String fileName = StorePathConfigHelper.getAbortFile(((DefaultMessageStore) messageStore).getMessageStoreConfig().getStorePathRootDir());
File file = new File(fileName);
MappedFile.ensureDirOK(file.getParent());
file.createNewFile();

messageStore = buildMessageStore();
load = messageStore.load();
assertTrue(load);
messageStore.start();
assertTrue(secondLastPhyOffset == messageStore.getMaxPhyOffset());
assertTrue(secondLastCqOffset == messageStore.getMaxOffsetInQueue(topic, 0));

//message write again
for (int i = 0; i < 100; i++) {
messageExtBrokerInner = buildMessage();
messageExtBrokerInner.setTopic(topic);
messageExtBrokerInner.setQueueId(0);
messageStore.putMessage(messageExtBrokerInner);
}
}

private void damageCommitlog(long offset) throws Exception {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
File file = new File(messageStoreConfig.getStorePathCommitLog() + File.separator + "00000000000000000000");

FileChannel fileChannel = new RandomAccessFile(file, "rw").getChannel();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 1024 * 1024 * 10);

int bodyLen = mappedByteBuffer.getInt((int) offset + 84);
int topicLenIndex = (int) offset + 84 + bodyLen + 4;
mappedByteBuffer.position(topicLenIndex);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);
mappedByteBuffer.putInt(0);

mappedByteBuffer.force();
fileChannel.force(true);
fileChannel.close();
}

private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, long msgStoreTime,
Expand Down

0 comments on commit edf9703

Please sign in to comment.