diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 01046f9609..ccead21dcd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -512,6 +512,21 @@ GroupInfoReply getGroupInfo(GroupInfoRequest request) { getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy()); } + private RoleInfoProto getRoleInfoProto(RaftPeer leaderPeerInfo) { + RaftPeerRole currentRole = role.getCurrentRole(); + RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder() + .setSelf(getPeer().getRaftPeerProto()) + .setRole(currentRole) + .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()); + final Optional fs = role.getFollowerState(); + final ServerRpcProto leaderInfo = + ServerProtoUtils.toServerRpcProto(leaderPeerInfo, + fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L)); + roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder().setLeaderInfo(leaderInfo) + .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0))); + return roleInfo.build(); + } + RoleInfoProto getRoleInfoProto() { RaftPeerRole currentRole = role.getCurrentRole(); RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder() @@ -1545,13 +1560,27 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot( return reply; } + Optional leaderPeerInfo = null; + if (request.hasLastRaftConfigurationLogEntryProto()) { + List peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry() + .getPeersList(); + leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst(); + Preconditions.assertTrue(leaderPeerInfo.isPresent()); + } + + // For the cases where RaftConf is empty on newly started peer with + // empty peer list, we retrieve leader info from + // installSnapShotRequestProto. + RoleInfoProto roleInfoProto = + getRaftConf().getPeer(state.getLeaderId()) == null ? + getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) : + getRoleInfoProto(); // This is the first installSnapshot notify request for this term and // index. Notify the state machine to install the snapshot. LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.", getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex); - try { - stateMachine.followerEvent().notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex) + stateMachine.followerEvent().notifyInstallSnapshotFromLeader(roleInfoProto, firstAvailableLogTermIndex) .whenComplete((reply, exception) -> { if (exception != null) { LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}", diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java index c690ea1933..675ee55272 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java @@ -175,7 +175,8 @@ private void testAddNewFollowers(CLUSTER cluster) throws Exception { Assert.assertTrue(set); // add two more peers - final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true); + final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, + true); // trigger setConfiguration cluster.setConfiguration(change.allPeersInNewConf); @@ -317,7 +318,8 @@ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Except } // Add two more peers who will need snapshots from the leader. - final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true); + final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true, + true); // trigger setConfiguration cluster.setConfiguration(change.allPeersInNewConf); RaftServerTestUtil diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java index 5a519976fa..09b057b9a5 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java @@ -109,7 +109,8 @@ void runTestNotLeaderExceptionWithReconf(CLUSTER cluster) throws Exception { final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader); // add two more peers - MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{"ss1", "ss2"}, true); + MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{ + "ss1", "ss2"}, true, false); // trigger setConfiguration LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf)); try (final RaftClient c2 = cluster.createClient(newLeader)) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 2d2c219e88..955bb42f58 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -283,7 +283,8 @@ public RaftProperties getProperties() { public MiniRaftCluster initServers() { LOG.info("servers = " + servers); if (servers.isEmpty()) { - putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true); + putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), + true, false); } return this; } @@ -296,10 +297,18 @@ public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean form } private Collection putNewServers( - Iterable peers, boolean format) { - return StreamSupport.stream(peers.spliterator(), false) - .map(id -> putNewServer(id, group, format)) - .collect(Collectors.toList()); + Iterable peers, boolean format, boolean emptyPeer) { + if (emptyPeer) { + RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(), + Collections.EMPTY_LIST); + return StreamSupport.stream(peers.spliterator(), false) + .map(id -> putNewServer(id, raftGroup, format)) + .collect(Collectors.toList()); + } else { + return StreamSupport.stream(peers.spliterator(), false) + .map(id -> putNewServer(id, group, format)) + .collect(Collectors.toList()); + } } public void start() throws IOException { @@ -337,7 +346,7 @@ public void restart(boolean format) throws IOException { List idList = new ArrayList<>(servers.keySet()); servers.clear(); - putNewServers(idList, format); + putNewServers(idList, format, false); start(); } @@ -406,15 +415,21 @@ private static List toRaftPeers(Iterable servers) { public PeerChanges addNewPeers(int number, boolean startNewPeer) throws IOException { - return addNewPeers(generateIds(number, servers.size()), startNewPeer); + return addNewPeers(generateIds(number, servers.size()), startNewPeer, false); + } + + public PeerChanges addNewPeers(int number, boolean startNewPeer, + boolean emptyPeer) throws IOException { + return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer); } - public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException { + public PeerChanges addNewPeers(String[] ids, boolean startNewPeer, + boolean emptyPeer) throws IOException { LOG.info("Add new peers {}", Arrays.asList(ids)); // create and add new RaftServers final Collection newServers = putNewServers( - CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true); + CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true, emptyPeer); startServers(newServers); if (!startNewPeer) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java index 6e7522435d..22b60daabf 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java @@ -361,7 +361,8 @@ void runTestBootstrapReconf(int numNewPeer, boolean startNewPeer, CLUSTER cluste } FIVE_SECONDS.sleep(); LOG.info(cluster.printServers()); - assertSuccess(success); + + RaftTestUtil.waitFor(() -> success.get(), 300, 15000); final RaftLog leaderLog = cluster.getLeader().getRaftLog(); for (RaftPeer newPeer : c1.newPeers) { @@ -452,12 +453,6 @@ void runTestKillLeaderDuringReconf(CLUSTER cluster) throws Exception { } } - static void assertSuccess(final AtomicReference success) { - final String s = "success=" + success; - Assert.assertNotNull(s, success.get()); - Assert.assertTrue(s, success.get()); - } - /** * When a request's new configuration is the same with the current one, make * sure we return success immediately and no log entry is recorded. diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java index 714ff68979..9837fe3783 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java @@ -230,7 +230,7 @@ public void testBasicInstallSnapshot() throws Exception { // add two more peers String[] newPeers = new String[]{"s3", "s4"}; MiniRaftCluster.PeerChanges change = cluster.addNewPeers( - newPeers, true); + newPeers, true, false); // trigger setConfiguration cluster.setConfiguration(change.allPeersInNewConf);