Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3387,8 +3387,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {

} else {
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
// Reset our term.
n.term = n.pterm
if ae.pindex > n.pindex {
// Setup our state for catching up.
inbox := n.createCatchup(ae)
Expand Down
75 changes: 75 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,3 +702,78 @@ func TestNRGCandidateDoesntRevertTermAfterOldAE(t *testing.T) {
// The candidate must not have reverted back to term 6.
require_NotEqual(t, follower.term, 6)
}

func TestNRGTermDoesntRollBackToPtermOnCatchup(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.leader(), nats.UserInfo("admin", "s3cr3t!"))
defer nc.Close()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

// Propose some entries so that we have entries in the log that have pterm 1.
lsm := rg.leader().(*stateAdder)
for i := 0; i < 5; i++ {
lsm.proposeDelta(1)
rg.waitOnTotal(t, int64(i)+1)
}

// Check that everyone is where they are supposed to be.
rg.lockAll()
for _, n := range rg {
rn := n.node().(*raft)
require_Equal(t, rn.term, 1)
require_Equal(t, rn.pterm, 1)
require_Equal(t, rn.pindex, 6)
}
rg.unlockAll()

// Force a stepdown so that we move up to term 2.
rg.leader().node().(*raft).switchToFollower(noLeader)
rg.waitOnLeader()
leader := rg.leader().node().(*raft)

// Now make sure everyone has moved up to term 2. Additionally we're
// going to prune back the follower logs to term 1 as this is what will
// create the right conditions for the catchup.
rg.lockAll()
for _, n := range rg {
rn := n.node().(*raft)
require_Equal(t, rn.term, 2)

if !rn.Leader() {
rn.truncateWAL(1, 6) // This will overwrite rn.term, so...
rn.term = 2 // ... we'll set it back manually.
require_Equal(t, rn.pterm, 1)
require_Equal(t, rn.pindex, 6)
}
}
rg.unlockAll()

arInbox := nc.NewRespInbox()
arCh := make(chan *nats.Msg, 2)
_, err := nc.ChanSubscribe(arInbox, arCh)
require_NoError(t, err)

// In order to trip this condition, we need to send an append entry that
// will trick the followers into running a catchup. In the process they
// were setting the term back to pterm which is incorrect.
ae := newAppendEntry(leader.id, leader.term, leader.commit, leader.pterm, leader.pindex, nil)
b, err := ae.encode(nil)
require_NoError(t, err)
require_NoError(t, nc.PublishMsg(&nats.Msg{
Subject: fmt.Sprintf(raftAppendSubj, "TEST"),
Reply: arInbox,
Data: b,
}))

// Wait for both followers to respond to the append entry and then verify
// that none of the nodes should have reverted back to term 1.
require_ChanRead(t, arCh, time.Second*5) // First follower
require_ChanRead(t, arCh, time.Second*5) // Second follower
for _, n := range rg {
require_NotEqual(t, n.node().Term(), 1)
}
}