Skip to content

Commit

Permalink
ConfirmOffset directly takes the max offset when allAckInSyncStateSet…
Browse files Browse the repository at this point in the history
… is false (#7657)
  • Loading branch information
RongtongJin authored Dec 18, 2023
1 parent 086a726 commit 80c0330
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions store/src/main/java/org/apache/rocketmq/store/CommitLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ public boolean getLastMappedFile(final long startOffset) {

/**
* When the normal exit, data recovery, all memory data have been flush
*
* @throws RocksDBException only in rocksdb mode
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) throws RocksDBException {
Expand Down Expand Up @@ -636,7 +637,8 @@ private void setBatchSizeIfNeeded(Map<String, String> propertiesMap, DispatchReq
public long getConfirmOffset() {
if (this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
if (((AutoSwitchHAService) this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1
|| !this.defaultMessageStore.getMessageStoreConfig().isAllAckInSyncStateSet()) {
return this.defaultMessageStore.getMaxPhyOffset();
}
// First time it will compute the confirmOffset.
Expand Down Expand Up @@ -1214,7 +1216,7 @@ public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatc
}
} catch (RocksDBException e) {
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
} finally {
} finally {
topicQueueLock.unlock(topicQueueKey);
}

Expand Down Expand Up @@ -1840,7 +1842,8 @@ class DefaultAppendMessageCallback implements AppendMessageCallback {
this.messageStoreConfig = messageStoreConfig;
}

public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer, final MessageExtBrokerInner msgInner) {
public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer,
final MessageExtBrokerInner msgInner) {
if (msgInner.isEncodeCompleted()) {
return null;
}
Expand All @@ -1850,10 +1853,10 @@ public AppendMessageResult handlePropertiesForLmqMsg(ByteBuffer preEncodeBuffer,
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));

final byte[] propertiesData =
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

boolean needAppendLastPropertySeparator = enabledAppendPropCRC && propertiesData != null && propertiesData.length > 0
&& propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR;
&& propertiesData[propertiesData.length - 1] != MessageDecoder.PROPERTY_SEPARATOR;

final int propertiesLength = (propertiesData == null ? 0 : propertiesData.length) + (needAppendLastPropertySeparator ? 1 : 0) + crc32ReservedLength;

Expand Down Expand Up @@ -2312,7 +2315,7 @@ public boolean isDataInPageCache(final long offset) {
return true;
}

int pos = (int)(offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
int pos = (int) (offset % defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog());
int realIndex = pos / pageSize / sampleSteps;
return bytes.length - 1 >= realIndex && bytes[realIndex] != 0;
}
Expand Down Expand Up @@ -2356,8 +2359,8 @@ private byte[] sampling(byte[] pageCacheTable, int sampleStep) {

private byte[] checkFileInPageCache(MappedFile mappedFile) {
long fileSize = mappedFile.getFileSize();
final long address = ((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
int pageNums = (int)(fileSize + this.pageSize - 1) / this.pageSize;
final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address();
int pageNums = (int) (fileSize + this.pageSize - 1) / this.pageSize;
byte[] pageCacheRst = new byte[pageNums];
int mincore = LibC.INSTANCE.mincore(new Pointer(address), new NativeLong(fileSize), pageCacheRst);
if (mincore != 0) {
Expand Down Expand Up @@ -2395,7 +2398,7 @@ public boolean isMsgInColdArea(String group, String topic, int queueId, long off
return false;
}
try {
ConsumeQueue consumeQueue = (ConsumeQueue)defaultMessageStore.findConsumeQueue(topic, queueId);
ConsumeQueue consumeQueue = (ConsumeQueue) defaultMessageStore.findConsumeQueue(topic, queueId);
if (null == consumeQueue) {
return false;
}
Expand Down Expand Up @@ -2433,7 +2436,7 @@ private int setFileReadMode(MappedFile mappedFile, int mode) {
log.error("setFileReadMode mappedFile is null");
return -1;
}
final long address = ((DirectBuffer)mappedFile.getMappedByteBuffer()).address();
final long address = ((DirectBuffer) mappedFile.getMappedByteBuffer()).address();
int madvise = LibC.INSTANCE.madvise(new Pointer(address), new NativeLong(mappedFile.getFileSize()), mode);
if (madvise != 0) {
log.error("setFileReadMode error fileName: {}, madvise: {}, mode:{}", mappedFile.getFileName(), madvise, mode);
Expand Down

0 comments on commit 80c0330

Please sign in to comment.