From 8541ed5aa9e2ba5678fc1055f0a33b0d682eb689 Mon Sep 17 00:00:00 2001 From: Symious Date: Wed, 13 Dec 2023 16:34:44 +0800 Subject: [PATCH 1/3] RATIS-1968. Remove unsed reset --- .../ratis/client/impl/OrderedAsync.java | 7 +++- .../ratis/client/impl/OrderedStreamAsync.java | 4 ++ .../org/apache/ratis/util/SlidingWindow.java | 39 +++++++++++-------- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java index a1aa58681c..523744c941 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java @@ -63,6 +63,7 @@ static class PendingOrderedRequest extends PendingClientRequest private final long seqNum; private final AtomicReference> requestConstructor; private volatile boolean isFirst = false; + private volatile long firstSeqNum = 0; PendingOrderedRequest(long callId, long seqNum, Function requestConstructor) { @@ -83,6 +84,10 @@ public void setFirstRequest() { isFirst = true; } + public long getCallId() { + return callId; + } + @Override public long getSeqNum() { return seqNum; @@ -133,7 +138,7 @@ private OrderedAsync(RaftClientImpl client, RaftProperties properties) { } private void resetSlidingWindow(RaftClientRequest request) { - getSlidingWindow(request).resetFirstSeqNum(); + getSlidingWindow(request).resetFirstSeqNum(request.getCallId()); } private SlidingWindow.Client getSlidingWindow(RaftClientRequest request) { diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java index 989c00cbbc..ac91873521 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java @@ -73,6 +73,10 @@ public long getSeqNum() { return seqNum; } + public long getCallId() { + return -1; + } + @Override public void setReply(DataStreamReply dataStreamReply) { replyFuture.complete(dataStreamReply); diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java index 316604db07..b68fd40756 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.util; +import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,8 +26,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -56,6 +59,7 @@ interface Request { interface ClientSideRequest extends Request { void setFirstRequest(); + long getCallId(); } interface ServerSideRequest extends Request { @@ -228,13 +232,14 @@ class Client, REPLY> { private final RequestMap requests; /** Delayed requests. */ private final DelayedRequests delayedRequests = new DelayedRequests(); + private final ConcurrentHashMap map = new ConcurrentHashMap<>(); /** The seqNum for the next new request. */ private long nextSeqNum = 1; /** The seqNum of the first request. */ - private long firstSeqNum = -1; + private volatile long firstSeqNum = -1; /** Is the first request replied? */ - private boolean firstReplied; + private volatile boolean firstReplied; /** The exception, if there is any. */ private Throwable exception; @@ -300,6 +305,7 @@ private boolean sendOrDelayRequest(REQUEST request, Consumer sendMethod if (firstReplied) { // already received the reply for the first request, submit any request. + map.put(request.getCallId(), getFirstSeqNum()); sendMethod.accept(request); return true; } @@ -309,6 +315,7 @@ private boolean sendOrDelayRequest(REQUEST request, Consumer sendMethod LOG.debug("{}: detect firstSubmitted {} in {}", requests.getName(), request, this); firstSeqNum = seqNum; request.setFirstRequest(); + map.put(request.getCallId(), getFirstSeqNum()); sendMethod.accept(request); return true; } @@ -333,7 +340,9 @@ public synchronized void retry(REQUEST request, Consumer sendMethod) { private void removeRepliedFromHead() { for (final Iterator i = requests.iterator(); i.hasNext(); i.remove()) { final REQUEST r = i.next(); - if (!r.hasReply()) { + if (r.hasReply()) { + map.remove(r.getCallId()); + } else { return; } } @@ -360,24 +369,16 @@ private void trySendDelayed(Consumer sendMethod) { // after first received, all other requests can be submitted (out-of-order) delayedRequests.getAllAndClear().forEach( seqNum -> sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed"))); - } else { - // Otherwise, submit the first only if it is a delayed request - final Iterator i = requests.iterator(); - if (i.hasNext()) { - final REQUEST r = i.next(); - final Long delayed = delayedRequests.remove(r.getSeqNum()); - if (delayed != null) { - sendOrDelayRequest(r, sendMethod); - } - } } } /** Reset the {@link #firstSeqNum} The stream has an error. */ - public synchronized void resetFirstSeqNum() { - firstSeqNum = -1; - firstReplied = false; - LOG.debug("After resetFirstSeqNum: {}", this); + public synchronized void resetFirstSeqNum(long callId) { + if (callId == -1 || getFirstSeqNum() == map.get(callId)) { + firstSeqNum = -1; + firstReplied = false; + LOG.debug("After resetFirstSeqNum: {}", this); + } } /** Fail all requests starting from the given seqNum. */ @@ -409,6 +410,10 @@ private void alreadyClosed(REQUEST request, Throwable e) { public synchronized boolean isFirst(long seqNum) { return seqNum == (firstSeqNum != -1 ? firstSeqNum : requests.firstSeqNum()); } + + public long getFirstSeqNum() { + return firstSeqNum != -1 ? firstSeqNum : requests.firstSeqNum(); + } } /** From ea6816617c91701d0e796b108f6481573bd38324 Mon Sep 17 00:00:00 2001 From: Symious Date: Wed, 13 Dec 2023 16:38:41 +0800 Subject: [PATCH 2/3] RATIS-1968. Fix checkstyle --- .../main/java/org/apache/ratis/client/impl/OrderedAsync.java | 1 - .../src/main/java/org/apache/ratis/util/SlidingWindow.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java index 523744c941..e17443d661 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java @@ -63,7 +63,6 @@ static class PendingOrderedRequest extends PendingClientRequest private final long seqNum; private final AtomicReference> requestConstructor; private volatile boolean isFirst = false; - private volatile long firstSeqNum = 0; PendingOrderedRequest(long callId, long seqNum, Function requestConstructor) { diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java index b68fd40756..bae2aab9a6 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.util; -import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.exceptions.AlreadyClosedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +25,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; From 9d60a54ea44f115b10e146c5628b3b770e5ff5cc Mon Sep 17 00:00:00 2001 From: Symious Date: Thu, 14 Dec 2023 10:13:25 +0800 Subject: [PATCH 3/3] RATIS-1968. revert else block to handle exceptions slowly --- .../main/java/org/apache/ratis/util/SlidingWindow.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java index bae2aab9a6..e9608d457c 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java @@ -367,6 +367,16 @@ private void trySendDelayed(Consumer sendMethod) { // after first received, all other requests can be submitted (out-of-order) delayedRequests.getAllAndClear().forEach( seqNum -> sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed"))); + } else { + // Otherwise, submit the first only if it is a delayed request + final Iterator i = requests.iterator(); + if (i.hasNext()) { + final REQUEST r = i.next(); + final Long delayed = delayedRequests.remove(r.getSeqNum()); + if (delayed != null) { + sendOrDelayRequest(r, sendMethod); + } + } } }