Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3187,10 +3187,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// another node has taken on the leader role already, so we should convert
// to a follower of that node instead.
if n.State() == Candidate {
// Ignore old terms, otherwise we might end up stepping down incorrectly.
// Needs to be ahead of our pterm (last log index), as an isolated node
// could have bumped its vote term up considerably past this point.
if ae.term >= n.pterm {
// If we have a leader in the current term or higher, we should stepdown,
// write the term and vote if the term of the request is higher.
if ae.term >= n.term {
// If the append entry term is newer than the current term, erase our
// vote.
if ae.term > n.term {
Expand Down
61 changes: 61 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,3 +844,64 @@ func TestNRGNoResetOnAppendEntryResponse(t *testing.T) {
require_NotEqual(t, leader.pterm, 0)
require_NotEqual(t, leader.pindex, 0)
}

func TestNRGCandidateDontStepdownDueToLeaderOfPreviousTerm(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
c.waitOnLeader()

nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

var (
candidatePterm uint64 = 50
candidatePindex uint64 = 70
candidateTerm uint64 = 100
)

// Create a candidate that has received entries while they were a follower in a previous term
candidate := rg.nonLeader().node().(*raft)
candidate.Lock()
candidate.switchState(Candidate)
candidate.pterm = candidatePterm
candidate.pindex = candidatePindex
candidate.term = candidateTerm
candidate.Unlock()

// Leader term is behind candidate
leader := rg.leader().node().(*raft)
leader.Lock()
leader.term = candidatePterm
leader.pterm = candidatePterm
leader.pindex = candidatePindex
leader.Unlock()

// Subscribe to the append entry subject.
sub, err := nc.SubscribeSync(leader.asubj)
require_NoError(t, err)

// Get the first append entry that we receive, should be heartbeat from leader of prev term
msg, err := sub.NextMsg(5 * time.Second)
require_NoError(t, err)

// Stop nodes from progressing so we can check state
rg.lockAll()
defer rg.unlockAll()

// Decode the append entry
ae, err := leader.decodeAppendEntry(msg.Data, nil, msg.Reply)
require_NoError(t, err)

// Check that the append entry is from the leader
require_Equal(t, ae.leader, leader.id)
// Check that it came from the leader before it updated its term with the response from the candidate
require_Equal(t, ae.term, candidatePterm)

// Check that the candidate hasn't stepped down
require_Equal(t, candidate.State(), Candidate)
// Check that the candidate's term is still ahead of the leader's term
require_True(t, candidate.term > ae.term)
}