Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3271,6 +3271,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdown.push(ae.leader)
}
} else if ae.term < n.term && !catchingUp && isNew {
n.debug("Ignoring AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, ae.term)
n.Unlock()
return
}

if isNew && n.leader != ae.leader && n.State() == Follower {
Expand Down
47 changes: 47 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,53 @@ func TestNRGObserverMode(t *testing.T) {
}
}

func TestNRGAEFromOldLeader(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

// Listen out for catchup requests.
ch := make(chan *nats.Msg, 16)
_, err := nc.ChanSubscribe(fmt.Sprintf(raftCatchupReply, ">"), ch)
require_NoError(t, err)

// Start next term so that we can reuse term 1 in the next step.
leader := rg.leader().node().(*raft)
leader.StepDown()
time.Sleep(time.Millisecond * 100)
rg.waitOnLeader()
require_Equal(t, leader.Term(), 2)
leader = rg.leader().node().(*raft)

// Send an append entry with an outdated term. Beforehand, doing
// so would have caused a WAL reset and then would have triggered
// a Raft-level catchup.
ae := &appendEntry{
term: 1,
pindex: 0,
leader: leader.id,
reply: nc.NewRespInbox(),
}
payload, err := ae.encode(nil)
require_NoError(t, err)
resp, err := nc.Request(leader.asubj, payload, time.Second)
require_NoError(t, err)

// Wait for the response, the server should have rejected it.
ar := leader.decodeAppendEntryResponse(resp.Data)
require_NotNil(t, ar)
require_Equal(t, ar.success, false)

// No catchup should happen at this point because no reset should
// have happened.
require_NoChanRead(t, ch, time.Second*2)
}

// TestNRGSimpleElection tests that a simple election succeeds. It is
// simple because the group hasn't processed any entries and hasn't
// suffered any interruptions of any kind, therefore there should be
Expand Down