diff --git a/server/raft.go b/server/raft.go index a250a6bdb5e..773152f4eb5 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3497,22 +3497,48 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } - // If we are/were catching up ignore old catchup subs. - // This could happen when we stall or cancel a catchup. - if !isNew && sub != nil && (!catchingUp || sub != n.catchup.sub) { + // If we are/were catching up ignore old catchup subs, but only if catching up from an older server + // that doesn't send the leader term when catching up. We can reject old catchups from newer subs + // later, just by checking the append entry is on the correct term. + if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) { n.Unlock() n.debug("AppendEntry ignoring old entry from previous catchup") return } + // If this term is greater than ours. + if lterm > n.term { + n.term = lterm + n.vote = noVote + if isNew { + n.writeTermVote() + } + if n.State() != Follower { + n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader) + n.stepdownLocked(ae.leader) + } + } else if lterm < n.term && sub != nil && (isNew || ae.lterm != 0) { + // Anything that's below our expected highest term needs to be rejected. + // Unless we're replaying (sub=nil), in which case we'll always continue. + // For backward-compatibility we shouldn't reject if we're being caught up by an old server. + if isNew { + n.debug("Rejected AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, lterm) + } else { + n.debug("AppendEntry ignoring old entry from previous catchup") + } + n.Unlock() + // No need to respond, the leader will respond with the highest term already. + // We can simply reject here without sending additional responses. + return + } + // Check state if we are catching up. - var resetCatchingUp bool if catchingUp { if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { // If we are here we are good, so if we have a catchup pending we can cancel. n.cancelCatchup() // Reset our notion of catching up. - resetCatchingUp = true + catchingUp = false } else if isNew { var ar *appendEntryResponse var inbox string @@ -3532,34 +3558,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } - // If this term is greater than ours. - if lterm > n.term { - n.term = lterm - n.vote = noVote - if isNew { - n.writeTermVote() - } - if n.State() != Follower { - n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader) - n.stepdownLocked(ae.leader) - } - } else if lterm < n.term && sub != nil && !(catchingUp && ae.lterm == 0) { - // Anything that's below our expected highest term needs to be rejected. - // Unless we're replaying (sub=nil), in which case we'll always continue. - // For backward-compatibility we shouldn't reject if we're being caught up by an old server. - n.debug("Rejected AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, lterm) - ar := newAppendEntryResponse(n.term, n.pindex, n.id, false) - n.Unlock() - n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) - arPool.Put(ar) - return - } - - // Reset after checking the term is correct, because we use catchingUp in a condition above. - if resetCatchingUp { - catchingUp = false - } - if isNew && n.leader != ae.leader && n.State() == Follower { n.debug("AppendEntry updating leader to %q", ae.leader) n.updateLeader(ae.leader) diff --git a/server/raft_test.go b/server/raft_test.go index 8fdf93b7944..a4625302825 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -2530,7 +2530,6 @@ func TestNRGRejectAppendEntryDuringCatchupFromPreviousLeader(t *testing.T) { // Now send the second catchup entry. n.processAppendEntry(aeMsg2, nsub) - require_True(t, n.catchup == nil) require_Equal(t, n.pterm, 1) // Under the old behavior this entry is wrongly accepted. @@ -3286,6 +3285,113 @@ func TestNRGTruncateOnStartup(t *testing.T) { require_Equal(t, state.NumDeleted, 0) } +func TestNRGLeaderCatchupHandling(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: entries}) + + n.processAppendEntry(aeMsg1, n.aesub) + n.processAppendEntry(aeMsg2, n.aesub) + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.pindex, 3) + + n.switchToLeader() + + catchupReply := "$TEST" + nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + sub, err := nc.SubscribeSync(catchupReply) + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + // Simulate a follower that's up-to-date with only the first message. + n.catchupFollower(&appendEntryResponse{success: false, term: 1, index: 1, reply: catchupReply}) + + // Should receive all messages the leader knows up to this point. + msg, err := sub.NextMsg(500 * time.Millisecond) + require_NoError(t, err) + ae, err := n.decodeAppendEntry(msg.Data, nil, _EMPTY_) + require_NoError(t, err) + require_Equal(t, ae.pterm, 1) + require_Equal(t, ae.pindex, 1) + + msg, err = sub.NextMsg(500 * time.Millisecond) + require_NoError(t, err) + ae, err = n.decodeAppendEntry(msg.Data, nil, _EMPTY_) + require_NoError(t, err) + require_Equal(t, ae.pterm, 1) + require_Equal(t, ae.pindex, 2) +} + +func TestNRGNewEntriesFromOldLeaderResetsWALDuringCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + aeMsg1 := encode(t, &appendEntry{leader: nats0, lterm: 20, term: 20, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, lterm: 20, term: 20, commit: 0, pterm: 20, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, lterm: 20, term: 20, commit: 0, pterm: 20, pindex: 2, entries: entries}) + + aeMsg1Fork := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2Fork := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + + // Trigger a catchup. + n.processAppendEntry(aeMsg2, n.aesub) + validateCatchup := func() { + t.Helper() + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.cterm, 20) + require_Equal(t, n.catchup.cindex, 1) + } + validateCatchup() + + // Catchup the first missed entry. + csub := n.catchup.sub + n.processAppendEntry(aeMsg1, csub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.pterm, 20) + + // Would previously stall the catchup and restart it with a previous leader. + n.catchup.pindex = aeMsg1.pindex + 1 + n.catchup.active = time.Time{} + n.processAppendEntry(aeMsg1Fork, n.aesub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.pterm, 20) + validateCatchup() + + // Would previously reset the WAL. + n.processAppendEntry(aeMsg2Fork, n.aesub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.pterm, 20) + validateCatchup() + + // Now the catchup should continue, undisturbed by an old leader sending append entries. + n.processAppendEntry(aeMsg2, csub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.pterm, 20) + require_True(t, n.catchup == nil) + + // A remaining catchup entry can still be ingested, even if the catchup state itself is gone. + n.processAppendEntry(aeMsg3, csub) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.pterm, 20) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: