Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2061,13 +2061,34 @@ func (n *raft) Peers() []*Peer {

var peers []*Peer
for id, ps := range n.peers {
var current bool
var lag uint64
if n.commit > ps.li {
lag = n.commit - ps.li
if id == n.id {
// We are current and have no lag when compared with ourselves.
current = true
} else if n.id == n.leader {
// We are the leader, we know how many entries this replica has persisted.
// Lag is determined by how many entries we have quorum on in our log that haven't yet
// been persisted on the replica. They are current if there's no lag.
// This will show all peers that are part of quorum as "current".
if n.commit > ps.li {
lag = n.commit - ps.li
}
current = lag == 0
} else if id == n.leader {
// This peer is the leader, we don't know our lag, but we can report
// on whether we've seen the leader recently.
okInterval := hbInterval * 2
current = time.Since(ps.ts) <= okInterval
} else {
// The remaining condition is another follower that we're not in contact with.
// We intentionally leave current and lag as empty.
current, lag = false, 0
}

p := &Peer{
ID: id,
Current: id == n.leader || ps.li >= n.applied,
Current: current,
Last: ps.ts,
Lag: lag,
}
Expand Down Expand Up @@ -3838,14 +3859,17 @@ func (n *raft) updateLeader(newLeader string) {
}
}
}
// Reset last seen timestamps.
// Reset last seen timestamps and indices.
// If we are (or were) the leader we track(ed) everyone, and don't reset.
// But if we're a follower we only track the leader, and reset all others.
if newLeader != n.id && !wasLeader {
for peer, ps := range n.peers {
// Always reset last replicated index.
ps.li = 0
if peer == newLeader {
continue
}
// Only reset the last seen timestamp if this peer is not the leader.
ps.ts = time.Time{}
}
}
Expand Down
136 changes: 136 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5304,3 +5304,139 @@ func TestNRGReplayAddPeerKeepsClusterSize(t *testing.T) {
n.WaitForStop()
fs.Stop()
}

func TestNRGTrackPeerLag(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
lsm := rg.waitOnLeader()
require_NotNil(t, lsm)
ls := lsm.(*stateAdder)

checkPeerData := func() {
for _, sm := range rg {
rn := sm.node().(*raft)
rn.RLock()
leader := rn.leader
isLeader := RaftState(rn.state.Load()) == Leader
for id, ps := range rn.peers {
// If this peer is the leader (but not ourselves), then these values need to be populated.
if isLeader && id != rn.id {
if li := ps.li; li == 0 {
rn.RUnlock()
t.Fatal("require last replicated index to be set")
}
if ts := ps.ts; ts.IsZero() {
rn.RUnlock()
t.Fatal("require last seen timestamp to be set")
}
continue
}
if li := ps.li; li != 0 {
rn.RUnlock()
t.Fatalf("require equal, but got: %v != %v", ps.li, 0)
}
if ts := ps.ts; id != rn.leader && !ts.IsZero() {
rn.RUnlock()
t.Fatalf("require zero ts, but got: %v", ts)
}
}
rn.RUnlock()

// This test expects no lag and all nodes as current (except for followers as seen by other followers).
for _, ps := range rn.Peers() {
require_Equal(t, ps.Lag, 0)
require_Equal(t, ps.Current, ps.ID == rn.id || ps.ID == leader || isLeader)
}
}
}

for i := range 5 {
ls.proposeDelta(10)
rg.waitOnTotal(t, int64(10*(i+1)))
}
checkPeerData()

require_NoError(t, lsm.node().StepDown())
lsm = rg.waitOnLeader()
require_NotNil(t, lsm)
ls = lsm.(*stateAdder)

for i := range 5 {
ls.proposeDelta(10)
rg.waitOnTotal(t, int64(50+10*(i+1)))
}
checkPeerData()
}

func TestNRGPeersResponse(t *testing.T) {
// As a leader.
rn := &raft{
id: "me",
leader: "me",
peers: map[string]*lps{
"me": {li: 0},
"peer1": {li: 10},
"peer2": {li: 3},
"peer3": {li: 0},
},
pindex: 10,
commit: 10,
applied: 10,
}
rn.state.Store(int32(Leader))
for _, ps := range rn.Peers() {
switch ps.ID {
case "me":
require_Equal(t, ps.Lag, 0)
require_True(t, ps.Current)
case "peer1":
require_Equal(t, ps.Lag, 0)
require_True(t, ps.Current)
case "peer2":
require_Equal(t, ps.Lag, 7) // We persisted 10 entries, they have only 3.
require_False(t, ps.Current)
case "peer3":
require_Equal(t, ps.Lag, 10) // We persisted 10 entries, they have none.
require_False(t, ps.Current)
default:
t.Fatalf("unexpected peer ID: %s", ps.ID)
}
}

// As a follower.
rn = &raft{
id: "me",
leader: "leader",
peers: map[string]*lps{
"me": {li: 0},
"leader": {li: 0},
"peer": {li: 0},
},
}
rn.state.Store(int32(Follower))
for _, ps := range rn.Peers() {
switch ps.ID {
case "me":
require_Equal(t, ps.Lag, 0)
require_True(t, ps.Current)
case "leader":
require_Equal(t, ps.Lag, 0)
require_False(t, ps.Current)
default:
// As a follower ourselves, we don't know about the state of other followers.
require_Equal(t, ps.Lag, 0)
require_False(t, ps.Current)
}
}

// If we've heard from the leader recently, we report it as current.
rn.peers["leader"].ts = time.Now()
for _, ps := range rn.Peers() {
if ps.ID == "leader" {
require_Equal(t, ps.Lag, 0)
require_True(t, ps.Current)
}
}
}
Loading