diff --git a/server/raft.go b/server/raft.go index 87e48d9e693..52a1cbf6085 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2061,13 +2061,34 @@ func (n *raft) Peers() []*Peer { var peers []*Peer for id, ps := range n.peers { + var current bool var lag uint64 - if n.commit > ps.li { - lag = n.commit - ps.li + if id == n.id { + // We are current and have no lag when compared with ourselves. + current = true + } else if n.id == n.leader { + // We are the leader, we know how many entries this replica has persisted. + // Lag is determined by how many entries we have quorum on in our log that haven't yet + // been persisted on the replica. They are current if there's no lag. + // This will show all peers that are part of quorum as "current". + if n.commit > ps.li { + lag = n.commit - ps.li + } + current = lag == 0 + } else if id == n.leader { + // This peer is the leader, we don't know our lag, but we can report + // on whether we've seen the leader recently. + okInterval := hbInterval * 2 + current = time.Since(ps.ts) <= okInterval + } else { + // The remaining condition is another follower that we're not in contact with. + // We intentionally leave current and lag as empty. + current, lag = false, 0 } + p := &Peer{ ID: id, - Current: id == n.leader || ps.li >= n.applied, + Current: current, Last: ps.ts, Lag: lag, } @@ -3838,14 +3859,17 @@ func (n *raft) updateLeader(newLeader string) { } } } - // Reset last seen timestamps. + // Reset last seen timestamps and indices. // If we are (or were) the leader we track(ed) everyone, and don't reset. // But if we're a follower we only track the leader, and reset all others. if newLeader != n.id && !wasLeader { for peer, ps := range n.peers { + // Always reset last replicated index. + ps.li = 0 if peer == newLeader { continue } + // Only reset the last seen timestamp if this peer is not the leader. ps.ts = time.Time{} } } diff --git a/server/raft_test.go b/server/raft_test.go index 19420c1ae53..d09d2839956 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -5304,3 +5304,139 @@ func TestNRGReplayAddPeerKeepsClusterSize(t *testing.T) { n.WaitForStop() fs.Stop() } + +func TestNRGTrackPeerLag(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + lsm := rg.waitOnLeader() + require_NotNil(t, lsm) + ls := lsm.(*stateAdder) + + checkPeerData := func() { + for _, sm := range rg { + rn := sm.node().(*raft) + rn.RLock() + leader := rn.leader + isLeader := RaftState(rn.state.Load()) == Leader + for id, ps := range rn.peers { + // If this peer is the leader (but not ourselves), then these values need to be populated. + if isLeader && id != rn.id { + if li := ps.li; li == 0 { + rn.RUnlock() + t.Fatal("require last replicated index to be set") + } + if ts := ps.ts; ts.IsZero() { + rn.RUnlock() + t.Fatal("require last seen timestamp to be set") + } + continue + } + if li := ps.li; li != 0 { + rn.RUnlock() + t.Fatalf("require equal, but got: %v != %v", ps.li, 0) + } + if ts := ps.ts; id != rn.leader && !ts.IsZero() { + rn.RUnlock() + t.Fatalf("require zero ts, but got: %v", ts) + } + } + rn.RUnlock() + + // This test expects no lag and all nodes as current (except for followers as seen by other followers). + for _, ps := range rn.Peers() { + require_Equal(t, ps.Lag, 0) + require_Equal(t, ps.Current, ps.ID == rn.id || ps.ID == leader || isLeader) + } + } + } + + for i := range 5 { + ls.proposeDelta(10) + rg.waitOnTotal(t, int64(10*(i+1))) + } + checkPeerData() + + require_NoError(t, lsm.node().StepDown()) + lsm = rg.waitOnLeader() + require_NotNil(t, lsm) + ls = lsm.(*stateAdder) + + for i := range 5 { + ls.proposeDelta(10) + rg.waitOnTotal(t, int64(50+10*(i+1))) + } + checkPeerData() +} + +func TestNRGPeersResponse(t *testing.T) { + // As a leader. + rn := &raft{ + id: "me", + leader: "me", + peers: map[string]*lps{ + "me": {li: 0}, + "peer1": {li: 10}, + "peer2": {li: 3}, + "peer3": {li: 0}, + }, + pindex: 10, + commit: 10, + applied: 10, + } + rn.state.Store(int32(Leader)) + for _, ps := range rn.Peers() { + switch ps.ID { + case "me": + require_Equal(t, ps.Lag, 0) + require_True(t, ps.Current) + case "peer1": + require_Equal(t, ps.Lag, 0) + require_True(t, ps.Current) + case "peer2": + require_Equal(t, ps.Lag, 7) // We persisted 10 entries, they have only 3. + require_False(t, ps.Current) + case "peer3": + require_Equal(t, ps.Lag, 10) // We persisted 10 entries, they have none. + require_False(t, ps.Current) + default: + t.Fatalf("unexpected peer ID: %s", ps.ID) + } + } + + // As a follower. + rn = &raft{ + id: "me", + leader: "leader", + peers: map[string]*lps{ + "me": {li: 0}, + "leader": {li: 0}, + "peer": {li: 0}, + }, + } + rn.state.Store(int32(Follower)) + for _, ps := range rn.Peers() { + switch ps.ID { + case "me": + require_Equal(t, ps.Lag, 0) + require_True(t, ps.Current) + case "leader": + require_Equal(t, ps.Lag, 0) + require_False(t, ps.Current) + default: + // As a follower ourselves, we don't know about the state of other followers. + require_Equal(t, ps.Lag, 0) + require_False(t, ps.Current) + } + } + + // If we've heard from the leader recently, we report it as current. + rn.peers["leader"].ts = time.Now() + for _, ps := range rn.Peers() { + if ps.ID == "leader" { + require_Equal(t, ps.Lag, 0) + require_True(t, ps.Current) + } + } +}