Skip to content

Commit

Permalink
[ISSUE #6149] remove handle when exceed renewMaxTimeMillis (#6150)
Browse files Browse the repository at this point in the history
  • Loading branch information
xdkxlk authored Feb 21, 2023
1 parent 68ec1af commit 6f11538
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public class MessageReceiptHandle {
private final int reconsumeTimes;

private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
private volatile long timestamp;
private volatile long expectInvisibleTime;
private final long consumeTimestamp;
private volatile String receiptHandleStr;

public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
Expand All @@ -47,8 +46,7 @@ public MessageReceiptHandle(String group, String topic, int queueId, String rece
this.messageId = messageId;
this.queueOffset = queueOffset;
this.reconsumeTimes = reconsumeTimes;
this.expectInvisibleTime = receiptHandle.getInvisibleTime();
this.timestamp = receiptHandle.getRetrieveTime();
this.consumeTimestamp = receiptHandle.getRetrieveTime();
}

@Override
Expand All @@ -60,17 +58,17 @@ public boolean equals(Object o) {
return false;
}
MessageReceiptHandle handle = (MessageReceiptHandle) o;
return queueId == handle.queueId && queueOffset == handle.queueOffset && timestamp == handle.timestamp
&& reconsumeTimes == handle.reconsumeTimes && expectInvisibleTime == handle.expectInvisibleTime
return queueId == handle.queueId && queueOffset == handle.queueOffset && consumeTimestamp == handle.consumeTimestamp
&& reconsumeTimes == handle.reconsumeTimes
&& Objects.equal(group, handle.group) && Objects.equal(topic, handle.topic)
&& Objects.equal(messageId, handle.messageId) && Objects.equal(originalReceiptHandleStr, handle.originalReceiptHandleStr)
&& Objects.equal(receiptHandleStr, handle.receiptHandleStr);
}

@Override
public int hashCode() {
return Objects.hashCode(group, topic, queueId, messageId, queueOffset, originalReceiptHandleStr, timestamp,
reconsumeTimes, expectInvisibleTime, receiptHandleStr);
return Objects.hashCode(group, topic, queueId, messageId, queueOffset, originalReceiptHandleStr, consumeTimestamp,
reconsumeTimes, receiptHandleStr);
}

@Override
Expand All @@ -84,8 +82,7 @@ public String toString() {
.add("originalReceiptHandleStr", originalReceiptHandleStr)
.add("reconsumeTimes", reconsumeTimes)
.add("renewRetryTimes", renewRetryTimes)
.add("timestamp", timestamp)
.add("expectInvisibleTime", expectInvisibleTime)
.add("firstConsumeTimestamp", consumeTimestamp)
.add("receiptHandleStr", receiptHandleStr)
.toString();
}
Expand Down Expand Up @@ -122,19 +119,12 @@ public int getReconsumeTimes() {
return reconsumeTimes;
}

public long getTimestamp() {
return timestamp;
}

public long getExpectInvisibleTime() {
return expectInvisibleTime;
public long getConsumeTimestamp() {
return consumeTimestamp;
}

public void updateReceiptHandle(String receiptHandleStr) {
ReceiptHandle receiptHandle = ReceiptHandle.decode(receiptHandleStr);
this.receiptHandleStr = receiptHandleStr;
this.expectInvisibleTime = receiptHandle.getInvisibleTime();
this.timestamp = receiptHandle.getRetrieveTime();
}

public int incrementAndGetRenewRetryTimes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageRecei
log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
return CompletableFuture.completedFuture(null);
}
if (current - messageReceiptHandle.getTimestamp() < messageReceiptHandle.getExpectInvisibleTime()) {
if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
CompletableFuture<AckResult> future =
messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ public void testRenewWithErrorThenOK() {

@Test
public void testRenewReceiptHandleWhenTimeout() {
long newInvisibleTime = 0L;
long newInvisibleTime = 200L;
long maxRenewMs = ConfigurationManager.getProxyConfig().getRenewMaxTimeMillis();
String newReceiptHandle = ReceiptHandle.builder()
.startOffset(0L)
.retrieveTime(0)
.retrieveTime(System.currentTimeMillis() - maxRenewMs)
.invisibleTime(newInvisibleTime)
.reviveQueueId(1)
.topicType(ReceiptHandle.NORMAL_TOPIC)
Expand Down

0 comments on commit 6f11538

Please sign in to comment.