diff --git a/server/raft.go b/server/raft.go index ee93a80142a..d25aba38d25 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2729,7 +2729,6 @@ func (n *raft) runAsLeader() { n.stepdown(noLeader) return } - n.trackPeer(vresp.peer) case <-n.reqs.ch: // Because of drain() it is possible that we get nil from popOne(). if voteReq, ok := n.reqs.popOne(); ok { @@ -3093,7 +3092,7 @@ func (n *raft) applyCommit(index uint64) error { if lp, ok := n.peers[newPeer]; !ok { // We are not tracking this one automatically so we need to bump cluster size. - n.peers[newPeer] = &lps{time.Now(), 0, true} + n.peers[newPeer] = &lps{time.Time{}, 0, true} } else { // Mark as added. lp.kp = true @@ -3493,6 +3492,17 @@ func (n *raft) updateLeader(newLeader string) { } } } + // Reset last seen timestamps. + // If we're the leader we track everyone, and don't reset. + // But if we're a follower we only track the leader, and reset all others. + if newLeader != n.id { + for peer, ps := range n.peers { + if peer == newLeader { + continue + } + ps.ts = time.Time{} + } + } } // processAppendEntry will process an appendEntry. This is called either @@ -3583,15 +3593,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // sub, rather than a catch-up sub. isNew := sub != nil && sub == n.aesub - // Track leader directly - if isNew && ae.leader != noLeader { - if ps := n.peers[ae.leader]; ps != nil { - ps.ts = time.Now() - } else { - n.peers[ae.leader] = &lps{time.Now(), 0, true} - } - } - // If we are/were catching up ignore old catchup subs, but only if catching up from an older server // that doesn't send the leader term when catching up. We can reject old catchups from newer subs // later, just by checking the append entry is on the correct term. @@ -3663,6 +3664,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.updateLeadChange(false) } + // Track leader directly + // But, do so after all consistency checks so we don't track an old leader. + if isNew && ae.leader != noLeader && ae.leader == n.leader { + if ps := n.peers[ae.leader]; ps != nil { + ps.ts = time.Now() + } else { + n.peers[ae.leader] = &lps{time.Now(), 0, true} + } + } + if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { @@ -3831,10 +3842,8 @@ CONTINUE: case EntryAddPeer: if newPeer := string(e.Data); len(newPeer) == idLen { // Track directly, but wait for commit to be official - if ps := n.peers[newPeer]; ps != nil { - ps.ts = time.Now() - } else { - n.peers[newPeer] = &lps{time.Now(), 0, false} + if _, ok := n.peers[newPeer]; !ok { + n.peers[newPeer] = &lps{time.Time{}, 0, false} } // Store our peer in our global peer map for all peers. peers.LoadOrStore(newPeer, newPeer) @@ -4385,10 +4394,6 @@ func (n *raft) processVoteRequest(vr *voteRequest) error { } n.debug("Received a voteRequest %+v", vr) - if err := n.trackPeer(vr.candidate); err != nil { - return err - } - n.Lock() vresp := &voteResponse{n.term, n.id, false, n.pindex == 0} diff --git a/server/raft_test.go b/server/raft_test.go index 27125e368de..86db4fdf7c3 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -3620,6 +3620,82 @@ func TestNRGDrainAndReplaySnapshot(t *testing.T) { require_Equal(t, n.hcommit, 0) } +func TestNRGTrackPeerActive(t *testing.T) { + // The leader should track timestamps for all peers. + // Each follower should only track the leader, otherwise we would get outdated timestamps. + checkLastSeen := func(peers map[string]RaftzGroupPeer) { + for _, peer := range peers { + if peer.LastSeen == _EMPTY_ { + continue + } + elapsed, err := time.ParseDuration(peer.LastSeen) + require_NoError(t, err) + require_LessThan(t, elapsed, time.Second) + } + } + + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + ml := c.leader() + rs := c.randomNonLeader() + var preferred *Server + for _, s := range c.servers { + if s == ml || s == rs { + continue + } + preferred = s + break + } + require_NotNil(t, preferred) + + time.Sleep(2 * time.Second) + before := (*rs.Raftz(&RaftzOptions{}))[DEFAULT_SYSTEM_ACCOUNT][defaultMetaGroupName].Peers + checkLastSeen(before) + + js := ml.getJetStream() + n := js.getMetaGroup() + require_NoError(t, n.StepDown(preferred.NodeName())) + + time.Sleep(2 * time.Second) + after := (*rs.Raftz(&RaftzOptions{}))[DEFAULT_SYSTEM_ACCOUNT][defaultMetaGroupName].Peers + checkLastSeen(after) +} + +func TestNRGLostQuorum(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + require_Equal(t, n.State(), Follower) + require_False(t, n.Quorum()) + require_True(t, n.lostQuorum()) + + nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + // Respond to a vote request. + sub, err := nc.Subscribe(n.vsubj, func(m *nats.Msg) { + req := decodeVoteRequest(m.Data, m.Reply) + resp := voteResponse{term: req.term, peer: "random", granted: true} + m.Respond(resp.encode()) + }) + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + // Switch to candidate and make sure we properly track the peer as active. + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) + require_False(t, n.Quorum()) + require_True(t, n.lostQuorum()) + + n.runAsCandidate() + require_Equal(t, n.State(), Leader) + require_True(t, n.Quorum()) + require_False(t, n.lostQuorum()) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: