Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-2242 change consistency criteria of heartbeat during appendLog #1215

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

SzyWilliam
Copy link
Member

What changes were proposed in this pull request?

When enable both appendLog channel and heartbeat channel,

Leader:

appendLog will send an AppendEntries RPC with (previous = nextIndex0, entries = [e1,e2...]), then update the follower nextIndex to nextIndex1
Subsequent heartbeat channel will send AppendEntries RPCs with (previous = nextIndex1, entries = empty)
Follower:

Inconsistency reply will be triggered when handling the heartbeats by
https://github.com/apache/ratis/blob/8353a017fe6545fbfb74960ecb3a0f4396c478d2/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java#L1720-1724

What is the link to the Apache JIRA

https://issues.apache.org/jira/projects/RATIS/issues/RATIS-2242?filter=allissues

How was this patch tested?

Unit tests

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SzyWilliam , thanks for finding out the problem and working on this! Please see the comment inlined.

@@ -1717,7 +1717,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro
}

// Check if "previous" is contained in current state.
if (previous != null && !state.containsTermIndex(previous)) {
if (previous != null && !state.containsTermIndex(previous) && appendLogFuture.get().isDone()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will disable inconsistent check when the server is busy (appendLogFuture.get().isDone() will mostly false). We need to remember the entries being appended to the log; see https://issues.apache.org/jira/secure/attachment/13074189/1215_review.patch

@SzyWilliam
Copy link
Member Author

@szetszwo Thanks very much for the patch! I’m really impressed by how clean and intuitive the design of remembering the entries. It's always exciting to enjoy the art of programming.
Applied changes suggested by the patch, ptal ;-)

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

@szetszwo
Copy link
Contributor

It could throw IllegalStateException. Then, the ref is retained but not released.

java.lang.IllegalStateException: index0: expected == 2 but computed == 0
	at org.apache.ratis.util.Preconditions.assertTrue(Preconditions.java:77)
	at org.apache.ratis.util.Preconditions.assertSame(Preconditions.java:87)
	at org.apache.ratis.server.impl.ServerImplUtils$NavigableIndices.append(ServerImplUtils.java:145)
	at org.apache.ratis.server.impl.RaftServerImpl.appendLog(RaftServerImpl.java:1695)
	at org.apache.ratis.server.impl.RaftServerImpl.appendEntriesAsync(RaftServerImpl.java:1660)
	at org.apache.ratis.server.impl.RaftServerImpl.appendEntriesAsync(RaftServerImpl.java:1547)
	at org.apache.ratis.server.impl.RaftServerProxy.lambda$null$28(RaftServerProxy.java:656)
	at org.apache.ratis.util.JavaUtils.callAsUnchecked(JavaUtils.java:118)
	at org.apache.ratis.server.impl.RaftServerImpl.lambda$executeSubmitServerRequestAsync$10(RaftServerImpl.java:936)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

There seems to have two async calls, appendEntries1 and then appendEntries2. The failure happens when appendEntries2 runs faster than appendEntries1.

@szetszwo
Copy link
Contributor

gRPC calls onNext one-by-one but our RaftServerImpl.executeSubmitServerRequestAsync method uses CompletableFuture.supplyAsync causing the async call problem.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SzyWilliam , thanks for the update!

As mentioned in my previous comments, we should not use executeSubmitServerRequestAsync.

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 6b41c8c2a..523a41833 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -653,7 +653,8 @@ class RaftServerProxy implements RaftServer {
     try {
       final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
       return getImplFuture(groupId)
-          .thenCompose(impl -> impl.executeSubmitServerRequestAsync(() -> impl.appendEntriesAsync(requestRef)));
+          .thenCompose(impl -> JavaUtils.callAsUnchecked(
+              () -> impl.appendEntriesAsync(requestRef), CompletionException::new));
     } finally {
       requestRef.release();
     }

Comment on lines +141 to +147
// validate index0
final long index0 = indices.startIndex;
final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
if (lastEntry != null) {
Preconditions.assertSame(lastEntry.getValue().getNextIndex(), index0, "index0");
}
map.put(index0, indices);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove index0.

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 976a05008..c5010a534 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -138,13 +138,12 @@ public final class ServerImplUtils {
 
     synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
       for(ConsecutiveIndices indices : entriesTermIndices) {
-        // validate index0
-        final long index0 = indices.startIndex;
+        // validate startIndex
         final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
         if (lastEntry != null) {
-          Preconditions.assertSame(lastEntry.getValue().getNextIndex(), index0, "index0");
+          Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex");
         }
-        map.put(index0, indices);
+        map.put(indices.startIndex, indices);
       }
     }

Comment on lines -1690 to +1696
entriesRef.retain();
final List<LogEntryProto> entries = entriesRef.retain();
final List<ConsecutiveIndices> entriesTermIndices = ConsecutiveIndices.convert(entries);
appendLogTermIndices.append(entriesTermIndices);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually need to retain twice, one for sync and another one for async.

  private CompletableFuture<Void> appendLog(ReferenceCountedObject<List<LogEntryProto>> entriesRef) {
    final List<ConsecutiveIndices> entriesTermIndices;
    try(UncheckedAutoCloseableSupplier<List<LogEntryProto>> entries =  entriesRef.retainAndReleaseOnClose()) {
      entriesTermIndices = ConsecutiveIndices.convert(entries.get());
      appendLogTermIndices.append(entriesTermIndices);
    }

    entriesRef.retain();
    return appendLogFuture.updateAndGet(f -> f.thenCompose(
        ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants