Skip to content

Commit

Permalink
[ISSUE #6402]opt transaction message check (#6401)
Browse files Browse the repository at this point in the history
* If you specify a custom first check time CheckImmunityTimeInSeconds,And the commit/rollback request whose validity period exceeds CheckImmunityTimeInSeconds and is not checked back will be processed and failed

* remove useless code

* update transactionCheckInterval default to be 30s
  • Loading branch information
Focus-rth authored Mar 21, 2023
1 parent cafc9ac commit c5fec3a
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package org.apache.rocketmq.broker.processor;

import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.transaction.OperationResult;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand Down Expand Up @@ -125,6 +127,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
if (rejectCommitOrRollback(requestHeader, result.getPrepareMessage())) {
response.setCode(ResponseCode.ILLEGAL_OPERATION);
LOGGER.warn("Message commit fail [producer end]. currentTimeMillis - bornTime > checkImmunityTime, msgId={},commitLogOffset={}, wait check",
requestHeader.getMsgId(), requestHeader.getCommitLogOffset());
return response;
}
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
Expand All @@ -144,6 +152,12 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
if (rejectCommitOrRollback(requestHeader, result.getPrepareMessage())) {
response.setCode(ResponseCode.ILLEGAL_OPERATION);
LOGGER.warn("Message rollback fail [producer end]. currentTimeMillis - bornTime > checkImmunityTime, msgId={},commitLogOffset={}, wait check",
requestHeader.getMsgId(), requestHeader.getCommitLogOffset());
return response;
}
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
Expand All @@ -156,6 +170,30 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand
return response;
}

/**
* If you specify a custom first check time CheckImmunityTimeInSeconds,
* And the commit/rollback request whose validity period exceeds CheckImmunityTimeInSeconds and is not checked back will be processed and failed
* returns ILLEGAL_OPERATION 604 error
* @param requestHeader
* @param messageExt
* @return
*/
public boolean rejectCommitOrRollback(EndTransactionRequestHeader requestHeader, MessageExt messageExt) {
if (requestHeader.getFromTransactionCheck()) {
return false;
}
long transactionTimeout = brokerController.getBrokerConfig().getTransactionTimeOut();

String checkImmunityTimeStr = messageExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
if (StringUtils.isNotEmpty(checkImmunityTimeStr)) {
long valueOfCurrentMinusBorn = System.currentTimeMillis() - messageExt.getBornTimestamp();
long checkImmunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
//Non-check requests that exceed the specified custom first check time fail to return
return valueOfCurrentMinusBorn > checkImmunityTime;
}
return false;
}

@Override
public boolean rejectRequest() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,20 @@ public static MessageExtBrokerInner buildTransactionalMessageFromHalfMessage(Mes

return msgInner;
}

public static long getImmunityTime(String checkImmunityTimeStr, long transactionTimeout) {
long checkImmunityTime = 0;

try {
checkImmunityTime = Long.parseLong(checkImmunityTimeStr) * 1000;
} catch (Throwable ignored) {
}

//If a custom first check time is set, the minimum check time;
//The default check protection period is transactionTimeout
if (checkImmunityTime < transactionTimeout) {
checkImmunityTime = transactionTimeout;
}
return checkImmunityTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;

import java.nio.charset.StandardCharsets;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -119,6 +121,22 @@ public void testProcessRequest_RollBack() throws RemotingCommandException {
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}

@Test
public void testProcessRequest_RejectCommitMessage() throws RemotingCommandException {
when(transactionMsgService.commitMessage(any(EndTransactionRequestHeader.class))).thenReturn(createRejectResponse());
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_COMMIT_TYPE, false);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ILLEGAL_OPERATION);
}

@Test
public void testProcessRequest_RejectRollBackMessage() throws RemotingCommandException {
when(transactionMsgService.rollbackMessage(any(EndTransactionRequestHeader.class))).thenReturn(createRejectResponse());
RemotingCommand request = createEndTransactionMsgCommand(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE, false);
RemotingCommand response = endTransactionProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.ILLEGAL_OPERATION);
}

private MessageExt createDefaultMessageExt() {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("12345678");
Expand Down Expand Up @@ -149,4 +167,27 @@ private RemotingCommand createEndTransactionMsgCommand(int status, boolean isChe
request.makeCustomHeaderToNet();
return request;
}

private OperationResult createRejectResponse() {
OperationResult response = new OperationResult();
response.setPrepareMessage(createRejectMessageExt());
response.setResponseCode(ResponseCode.SUCCESS);
response.setResponseRemark(null);
return response;
}
private MessageExt createRejectMessageExt() {
MessageExt messageExt = new MessageExt();
messageExt.setMsgId("12345678");
messageExt.setQueueId(0);
messageExt.setCommitLogOffset(123456789L);
messageExt.setQueueOffset(1234);
messageExt.setBody("body".getBytes(StandardCharsets.UTF_8));
messageExt.setBornTimestamp(System.currentTimeMillis() - 65 * 1000);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_QUEUE_ID, "0");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, "testTransactionGroup");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_REAL_TOPIC, "TEST");
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS, "60");
return messageExt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.junit.Assert;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -50,4 +51,43 @@ public void testBuildTransactionalMessageFromHalfMessage() {
assertTrue(MessageSysFlag.check(msgExtInner.getSysFlag(), MessageSysFlag.TRANSACTION_PREPARED_TYPE));
assertEquals(msgExtInner.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP), halfMessage.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP));
}

@Test
public void testGetImmunityTime() {
long transactionTimeout = 6 * 1000;

String checkImmunityTimeStr = "1";
long immunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
Assert.assertEquals(6 * 1000, immunityTime);

checkImmunityTimeStr = "5";
immunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
Assert.assertEquals(6 * 1000, immunityTime);

checkImmunityTimeStr = "7";
immunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
Assert.assertEquals(7 * 1000, immunityTime);


checkImmunityTimeStr = null;
immunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
Assert.assertEquals(6 * 1000, immunityTime);

checkImmunityTimeStr = "-1";
immunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
Assert.assertEquals(6 * 1000, immunityTime);

checkImmunityTimeStr = "60";
immunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
Assert.assertEquals(60 * 1000, immunityTime);

checkImmunityTimeStr = "100";
immunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
Assert.assertEquals(100 * 1000, immunityTime);


checkImmunityTimeStr = "100.5";
immunityTime = TransactionalMessageUtil.getImmunityTime(checkImmunityTimeStr, transactionTimeout);
Assert.assertEquals(6 * 1000, immunityTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public class BrokerConfig extends BrokerIdentity {
* Transaction message check interval.
*/
@ImportantField
private long transactionCheckInterval = 60 * 1000;
private long transactionCheckInterval = 30 * 1000;

/**
* transaction batch op message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class ResponseCode extends RemotingSysResponseCode {

public static final int NOT_LEADER_FOR_QUEUE = 501;

public static final int ILLEGAL_OPERATION = 604;

public static final int RPC_UNKNOWN = -1000;
public static final int RPC_ADDR_IS_NULL = -1002;
public static final int RPC_SEND_TO_CHANNEL_FAILED = -1004;
Expand Down

0 comments on commit c5fec3a

Please sign in to comment.