Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #6149] remove handle when exceed renewMaxTimeMillis #6150

Merged
merged 1 commit into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ public class MessageReceiptHandle {
private final int reconsumeTimes;

private final AtomicInteger renewRetryTimes = new AtomicInteger(0);
private volatile long timestamp;
private volatile long expectInvisibleTime;
private final long consumeTimestamp;
private volatile String receiptHandleStr;

public MessageReceiptHandle(String group, String topic, int queueId, String receiptHandleStr, String messageId,
Expand All @@ -47,8 +46,7 @@ public MessageReceiptHandle(String group, String topic, int queueId, String rece
this.messageId = messageId;
this.queueOffset = queueOffset;
this.reconsumeTimes = reconsumeTimes;
this.expectInvisibleTime = receiptHandle.getInvisibleTime();
this.timestamp = receiptHandle.getRetrieveTime();
this.consumeTimestamp = receiptHandle.getRetrieveTime();
}

@Override
Expand All @@ -60,17 +58,17 @@ public boolean equals(Object o) {
return false;
}
MessageReceiptHandle handle = (MessageReceiptHandle) o;
return queueId == handle.queueId && queueOffset == handle.queueOffset && timestamp == handle.timestamp
&& reconsumeTimes == handle.reconsumeTimes && expectInvisibleTime == handle.expectInvisibleTime
return queueId == handle.queueId && queueOffset == handle.queueOffset && consumeTimestamp == handle.consumeTimestamp
&& reconsumeTimes == handle.reconsumeTimes
&& Objects.equal(group, handle.group) && Objects.equal(topic, handle.topic)
&& Objects.equal(messageId, handle.messageId) && Objects.equal(originalReceiptHandleStr, handle.originalReceiptHandleStr)
&& Objects.equal(receiptHandleStr, handle.receiptHandleStr);
}

@Override
public int hashCode() {
return Objects.hashCode(group, topic, queueId, messageId, queueOffset, originalReceiptHandleStr, timestamp,
reconsumeTimes, expectInvisibleTime, receiptHandleStr);
return Objects.hashCode(group, topic, queueId, messageId, queueOffset, originalReceiptHandleStr, consumeTimestamp,
reconsumeTimes, receiptHandleStr);
}

@Override
Expand All @@ -84,8 +82,7 @@ public String toString() {
.add("originalReceiptHandleStr", originalReceiptHandleStr)
.add("reconsumeTimes", reconsumeTimes)
.add("renewRetryTimes", renewRetryTimes)
.add("timestamp", timestamp)
.add("expectInvisibleTime", expectInvisibleTime)
.add("firstConsumeTimestamp", consumeTimestamp)
.add("receiptHandleStr", receiptHandleStr)
.toString();
}
Expand Down Expand Up @@ -122,19 +119,12 @@ public int getReconsumeTimes() {
return reconsumeTimes;
}

public long getTimestamp() {
return timestamp;
}

public long getExpectInvisibleTime() {
return expectInvisibleTime;
public long getConsumeTimestamp() {
return consumeTimestamp;
}

public void updateReceiptHandle(String receiptHandleStr) {
ReceiptHandle receiptHandle = ReceiptHandle.decode(receiptHandleStr);
this.receiptHandleStr = receiptHandleStr;
this.expectInvisibleTime = receiptHandle.getInvisibleTime();
this.timestamp = receiptHandle.getRetrieveTime();
}

public int incrementAndGetRenewRetryTimes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ protected CompletableFuture<MessageReceiptHandle> startRenewMessage(MessageRecei
log.warn("handle has exceed max renewRetryTimes. handle:{}", messageReceiptHandle);
return CompletableFuture.completedFuture(null);
}
if (current - messageReceiptHandle.getTimestamp() < messageReceiptHandle.getExpectInvisibleTime()) {
if (current - messageReceiptHandle.getConsumeTimestamp() < proxyConfig.getRenewMaxTimeMillis()) {
CompletableFuture<AckResult> future =
messagingProcessor.changeInvisibleTime(context, handle, messageReceiptHandle.getMessageId(),
messageReceiptHandle.getGroup(), messageReceiptHandle.getTopic(), proxyConfig.getRenewSliceTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,11 @@ public void testRenewWithErrorThenOK() {

@Test
public void testRenewReceiptHandleWhenTimeout() {
long newInvisibleTime = 0L;
long newInvisibleTime = 200L;
long maxRenewMs = ConfigurationManager.getProxyConfig().getRenewMaxTimeMillis();
String newReceiptHandle = ReceiptHandle.builder()
.startOffset(0L)
.retrieveTime(0)
.retrieveTime(System.currentTimeMillis() - maxRenewMs)
.invisibleTime(newInvisibleTime)
.reviveQueueId(1)
.topicType(ReceiptHandle.NORMAL_TOPIC)
Expand Down