Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 24 additions & 19 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2729,7 +2729,6 @@ func (n *raft) runAsLeader() {
n.stepdown(noLeader)
return
}
n.trackPeer(vresp.peer)
case <-n.reqs.ch:
// Because of drain() it is possible that we get nil from popOne().
if voteReq, ok := n.reqs.popOne(); ok {
Expand Down Expand Up @@ -3093,7 +3092,7 @@ func (n *raft) applyCommit(index uint64) error {

if lp, ok := n.peers[newPeer]; !ok {
// We are not tracking this one automatically so we need to bump cluster size.
n.peers[newPeer] = &lps{time.Now(), 0, true}
n.peers[newPeer] = &lps{time.Time{}, 0, true}
} else {
// Mark as added.
lp.kp = true
Expand Down Expand Up @@ -3493,6 +3492,17 @@ func (n *raft) updateLeader(newLeader string) {
}
}
}
// Reset last seen timestamps.
// If we're the leader we track everyone, and don't reset.
// But if we're a follower we only track the leader, and reset all others.
if newLeader != n.id {
for peer, ps := range n.peers {
if peer == newLeader {
continue
}
ps.ts = time.Time{}
}
}
}

// processAppendEntry will process an appendEntry. This is called either
Expand Down Expand Up @@ -3583,15 +3593,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// sub, rather than a catch-up sub.
isNew := sub != nil && sub == n.aesub

// Track leader directly
if isNew && ae.leader != noLeader {
if ps := n.peers[ae.leader]; ps != nil {
ps.ts = time.Now()
} else {
n.peers[ae.leader] = &lps{time.Now(), 0, true}
}
}

// If we are/were catching up ignore old catchup subs, but only if catching up from an older server
// that doesn't send the leader term when catching up. We can reject old catchups from newer subs
// later, just by checking the append entry is on the correct term.
Expand Down Expand Up @@ -3663,6 +3664,16 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.updateLeadChange(false)
}

// Track leader directly
// But, do so after all consistency checks so we don't track an old leader.
if isNew && ae.leader != noLeader && ae.leader == n.leader {
if ps := n.peers[ae.leader]; ps != nil {
ps.ts = time.Now()
} else {
n.peers[ae.leader] = &lps{time.Now(), 0, true}
}
}

if ae.pterm != n.pterm || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
Expand Down Expand Up @@ -3831,10 +3842,8 @@ CONTINUE:
case EntryAddPeer:
if newPeer := string(e.Data); len(newPeer) == idLen {
// Track directly, but wait for commit to be official
if ps := n.peers[newPeer]; ps != nil {
ps.ts = time.Now()
} else {
n.peers[newPeer] = &lps{time.Now(), 0, false}
if _, ok := n.peers[newPeer]; !ok {
n.peers[newPeer] = &lps{time.Time{}, 0, false}
}
// Store our peer in our global peer map for all peers.
peers.LoadOrStore(newPeer, newPeer)
Expand Down Expand Up @@ -4385,10 +4394,6 @@ func (n *raft) processVoteRequest(vr *voteRequest) error {
}
n.debug("Received a voteRequest %+v", vr)

if err := n.trackPeer(vr.candidate); err != nil {
return err
}

n.Lock()

vresp := &voteResponse{n.term, n.id, false, n.pindex == 0}
Expand Down
76 changes: 76 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3620,6 +3620,82 @@ func TestNRGDrainAndReplaySnapshot(t *testing.T) {
require_Equal(t, n.hcommit, 0)
}

func TestNRGTrackPeerActive(t *testing.T) {
// The leader should track timestamps for all peers.
// Each follower should only track the leader, otherwise we would get outdated timestamps.
checkLastSeen := func(peers map[string]RaftzGroupPeer) {
for _, peer := range peers {
if peer.LastSeen == _EMPTY_ {
continue
}
elapsed, err := time.ParseDuration(peer.LastSeen)
require_NoError(t, err)
require_LessThan(t, elapsed, time.Second)
}
}

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

ml := c.leader()
rs := c.randomNonLeader()
var preferred *Server
for _, s := range c.servers {
if s == ml || s == rs {
continue
}
preferred = s
break
}
require_NotNil(t, preferred)

time.Sleep(2 * time.Second)
before := (*rs.Raftz(&RaftzOptions{}))[DEFAULT_SYSTEM_ACCOUNT][defaultMetaGroupName].Peers
checkLastSeen(before)

js := ml.getJetStream()
n := js.getMetaGroup()
require_NoError(t, n.StepDown(preferred.NodeName()))

time.Sleep(2 * time.Second)
after := (*rs.Raftz(&RaftzOptions{}))[DEFAULT_SYSTEM_ACCOUNT][defaultMetaGroupName].Peers
checkLastSeen(after)
}

func TestNRGLostQuorum(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

require_Equal(t, n.State(), Follower)
require_False(t, n.Quorum())
require_True(t, n.lostQuorum())

nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()

// Respond to a vote request.
sub, err := nc.Subscribe(n.vsubj, func(m *nats.Msg) {
req := decodeVoteRequest(m.Data, m.Reply)
resp := voteResponse{term: req.term, peer: "random", granted: true}
m.Respond(resp.encode())
})
require_NoError(t, err)
defer sub.Drain()
require_NoError(t, nc.Flush())

// Switch to candidate and make sure we properly track the peer as active.
n.switchToCandidate()
require_Equal(t, n.State(), Candidate)
require_False(t, n.Quorum())
require_True(t, n.lostQuorum())

n.runAsCandidate()
require_Equal(t, n.State(), Leader)
require_True(t, n.Quorum())
require_False(t, n.lostQuorum())
}

// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
// proposing the next one.
// The test may fail if:
Expand Down