Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,21 @@ GroupInfoReply getGroupInfo(GroupInfoRequest request) {
getGroup(), getRoleInfoProto(), state.getStorage().getStorageDir().isHealthy());
}

private RoleInfoProto getRoleInfoProto(RaftPeer leaderPeerInfo) {
RaftPeerRole currentRole = role.getCurrentRole();
RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
.setSelf(getPeer().getRaftPeerProto())
.setRole(currentRole)
.setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
final Optional<FollowerState> fs = role.getFollowerState();
final ServerRpcProto leaderInfo =
ServerProtoUtils.toServerRpcProto(leaderPeerInfo,
fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder().setLeaderInfo(leaderInfo)
.setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
return roleInfo.build();
}

RoleInfoProto getRoleInfoProto() {
RaftPeerRole currentRole = role.getCurrentRole();
RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
Expand Down Expand Up @@ -1545,13 +1560,27 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
return reply;
}

Optional<RaftPeerProto> leaderPeerInfo = null;
if (request.hasLastRaftConfigurationLogEntryProto()) {
List<RaftPeerProto> peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry()
.getPeersList();
leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
Preconditions.assertTrue(leaderPeerInfo.isPresent());
}

// For the cases where RaftConf is empty on newly started peer with
// empty peer list, we retrieve leader info from
// installSnapShotRequestProto.
RoleInfoProto roleInfoProto =
getRaftConf().getPeer(state.getLeaderId()) == null ?
getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) :
getRoleInfoProto();
// This is the first installSnapshot notify request for this term and
// index. Notify the state machine to install the snapshot.
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
getMemberId(), state.getLog().getNextIndex(), firstAvailableLogIndex);

try {
stateMachine.followerEvent().notifyInstallSnapshotFromLeader(getRoleInfoProto(), firstAvailableLogTermIndex)
stateMachine.followerEvent().notifyInstallSnapshotFromLeader(roleInfoProto, firstAvailableLogTermIndex)
.whenComplete((reply, exception) -> {
if (exception != null) {
LOG.warn("{}: Failed to notify StateMachine to InstallSnapshot. Exception: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ private void testAddNewFollowers(CLUSTER cluster) throws Exception {
Assert.assertTrue(set);

// add two more peers
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);

Expand Down Expand Up @@ -317,7 +318,8 @@ private void testInstallSnapshotNotificationCount(CLUSTER cluster) throws Except
}

// Add two more peers who will need snapshots from the leader.
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true);
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);
RaftServerTestUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ void runTestNotLeaderExceptionWithReconf(CLUSTER cluster) throws Exception {
final RaftPeerId newLeader = RaftTestUtil.changeLeader(cluster, oldLeader);

// add two more peers
MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{"ss1", "ss2"}, true);
MiniRaftCluster.PeerChanges change = cluster.addNewPeers(new String[]{
"ss1", "ss2"}, true, false);
// trigger setConfiguration
LOG.info("Start changing the configuration: {}", Arrays.asList(change.allPeersInNewConf));
try (final RaftClient c2 = cluster.createClient(newLeader)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ public RaftProperties getProperties() {
public MiniRaftCluster initServers() {
LOG.info("servers = " + servers);
if (servers.isEmpty()) {
putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId),
true, false);
}
return this;
}
Expand All @@ -296,10 +297,18 @@ public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean form
}

private Collection<RaftServer> putNewServers(
Iterable<RaftPeerId> peers, boolean format) {
return StreamSupport.stream(peers.spliterator(), false)
.map(id -> putNewServer(id, group, format))
.collect(Collectors.toList());
Iterable<RaftPeerId> peers, boolean format, boolean emptyPeer) {
if (emptyPeer) {
RaftGroup raftGroup = RaftGroup.valueOf(group.getGroupId(),
Collections.EMPTY_LIST);
return StreamSupport.stream(peers.spliterator(), false)
.map(id -> putNewServer(id, raftGroup, format))
.collect(Collectors.toList());
} else {
return StreamSupport.stream(peers.spliterator(), false)
.map(id -> putNewServer(id, group, format))
.collect(Collectors.toList());
}
}

public void start() throws IOException {
Expand Down Expand Up @@ -337,7 +346,7 @@ public void restart(boolean format) throws IOException {

List<RaftPeerId> idList = new ArrayList<>(servers.keySet());
servers.clear();
putNewServers(idList, format);
putNewServers(idList, format, false);
start();
}

Expand Down Expand Up @@ -406,15 +415,21 @@ private static List<RaftPeer> toRaftPeers(Iterable<RaftServer> servers) {

public PeerChanges addNewPeers(int number, boolean startNewPeer)
throws IOException {
return addNewPeers(generateIds(number, servers.size()), startNewPeer);
return addNewPeers(generateIds(number, servers.size()), startNewPeer, false);
}

public PeerChanges addNewPeers(int number, boolean startNewPeer,
boolean emptyPeer) throws IOException {
return addNewPeers(generateIds(number, servers.size()), startNewPeer, emptyPeer);
}

public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException {
public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
boolean emptyPeer) throws IOException {
LOG.info("Add new peers {}", Arrays.asList(ids));

// create and add new RaftServers
final Collection<RaftServer> newServers = putNewServers(
CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true);
CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true, emptyPeer);

startServers(newServers);
if (!startNewPeer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ void runTestBootstrapReconf(int numNewPeer, boolean startNewPeer, CLUSTER cluste
}
FIVE_SECONDS.sleep();
LOG.info(cluster.printServers());
assertSuccess(success);

RaftTestUtil.waitFor(() -> success.get(), 300, 15000);

final RaftLog leaderLog = cluster.getLeader().getRaftLog();
for (RaftPeer newPeer : c1.newPeers) {
Expand Down Expand Up @@ -452,12 +453,6 @@ void runTestKillLeaderDuringReconf(CLUSTER cluster) throws Exception {
}
}

static void assertSuccess(final AtomicReference<Boolean> success) {
final String s = "success=" + success;
Assert.assertNotNull(s, success.get());
Assert.assertTrue(s, success.get());
}

/**
* When a request's new configuration is the same with the current one, make
* sure we return success immediately and no log entry is recorded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void testBasicInstallSnapshot() throws Exception {
// add two more peers
String[] newPeers = new String[]{"s3", "s4"};
MiniRaftCluster.PeerChanges change = cluster.addNewPeers(
newPeers, true);
newPeers, true, false);
// trigger setConfiguration
cluster.setConfiguration(change.allPeersInNewConf);

Expand Down