Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 31 additions & 33 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3497,22 +3497,48 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}

// If we are/were catching up ignore old catchup subs.
// This could happen when we stall or cancel a catchup.
if !isNew && sub != nil && (!catchingUp || sub != n.catchup.sub) {
// If we are/were catching up ignore old catchup subs, but only if catching up from an older server
// that doesn't send the leader term when catching up. We can reject old catchups from newer subs
// later, just by checking the append entry is on the correct term.
if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) {
n.Unlock()
n.debug("AppendEntry ignoring old entry from previous catchup")
return
}

// If this term is greater than ours.
if lterm > n.term {
n.term = lterm
n.vote = noVote
if isNew {
n.writeTermVote()
}
if n.State() != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdownLocked(ae.leader)
}
} else if lterm < n.term && sub != nil && (isNew || ae.lterm != 0) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole block was moved up, only this statement was changed from:

} else if lterm < n.term && sub != nil && !(catchingUp && ae.lterm == 0) {

// Anything that's below our expected highest term needs to be rejected.
// Unless we're replaying (sub=nil), in which case we'll always continue.
// For backward-compatibility we shouldn't reject if we're being caught up by an old server.
if isNew {
n.debug("Rejected AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, lterm)
} else {
n.debug("AppendEntry ignoring old entry from previous catchup")
}
n.Unlock()
// No need to respond, the leader will respond with the highest term already.
// We can simply reject here without sending additional responses.
return
}

// Check state if we are catching up.
var resetCatchingUp bool
if catchingUp {
if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex {
// If we are here we are good, so if we have a catchup pending we can cancel.
n.cancelCatchup()
// Reset our notion of catching up.
resetCatchingUp = true
catchingUp = false
} else if isNew {
var ar *appendEntryResponse
var inbox string
Expand All @@ -3532,34 +3558,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}

// If this term is greater than ours.
if lterm > n.term {
n.term = lterm
n.vote = noVote
if isNew {
n.writeTermVote()
}
if n.State() != Follower {
n.debug("Term higher than ours and we are not a follower: %v, stepping down to %q", n.State(), ae.leader)
n.stepdownLocked(ae.leader)
}
} else if lterm < n.term && sub != nil && !(catchingUp && ae.lterm == 0) {
// Anything that's below our expected highest term needs to be rejected.
// Unless we're replaying (sub=nil), in which case we'll always continue.
// For backward-compatibility we shouldn't reject if we're being caught up by an old server.
n.debug("Rejected AppendEntry from a leader (%s) with term %d which is less than ours", ae.leader, lterm)
ar := newAppendEntryResponse(n.term, n.pindex, n.id, false)
n.Unlock()
n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf))
arPool.Put(ar)
return
}

// Reset after checking the term is correct, because we use catchingUp in a condition above.
if resetCatchingUp {
catchingUp = false
}

if isNew && n.leader != ae.leader && n.State() == Follower {
n.debug("AppendEntry updating leader to %q", ae.leader)
n.updateLeader(ae.leader)
Expand Down
108 changes: 107 additions & 1 deletion server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2530,7 +2530,6 @@ func TestNRGRejectAppendEntryDuringCatchupFromPreviousLeader(t *testing.T) {

// Now send the second catchup entry.
n.processAppendEntry(aeMsg2, nsub)
require_True(t, n.catchup == nil)
require_Equal(t, n.pterm, 1)

// Under the old behavior this entry is wrongly accepted.
Expand Down Expand Up @@ -3286,6 +3285,113 @@ func TestNRGTruncateOnStartup(t *testing.T) {
require_Equal(t, state.NumDeleted, 0)
}

func TestNRGLeaderCatchupHandling(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: entries})

n.processAppendEntry(aeMsg1, n.aesub)
n.processAppendEntry(aeMsg2, n.aesub)
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.pindex, 3)

n.switchToLeader()

catchupReply := "$TEST"
nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()

sub, err := nc.SubscribeSync(catchupReply)
require_NoError(t, err)
defer sub.Drain()
require_NoError(t, nc.Flush())

// Simulate a follower that's up-to-date with only the first message.
n.catchupFollower(&appendEntryResponse{success: false, term: 1, index: 1, reply: catchupReply})

// Should receive all messages the leader knows up to this point.
msg, err := sub.NextMsg(500 * time.Millisecond)
require_NoError(t, err)
ae, err := n.decodeAppendEntry(msg.Data, nil, _EMPTY_)
require_NoError(t, err)
require_Equal(t, ae.pterm, 1)
require_Equal(t, ae.pindex, 1)

msg, err = sub.NextMsg(500 * time.Millisecond)
require_NoError(t, err)
ae, err = n.decodeAppendEntry(msg.Data, nil, _EMPTY_)
require_NoError(t, err)
require_Equal(t, ae.pterm, 1)
require_Equal(t, ae.pindex, 2)
}

func TestNRGNewEntriesFromOldLeaderResetsWALDuringCatchup(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
aeMsg1 := encode(t, &appendEntry{leader: nats0, lterm: 20, term: 20, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, lterm: 20, term: 20, commit: 0, pterm: 20, pindex: 1, entries: entries})
aeMsg3 := encode(t, &appendEntry{leader: nats0, lterm: 20, term: 20, commit: 0, pterm: 20, pindex: 2, entries: entries})

aeMsg1Fork := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2Fork := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})

// Trigger a catchup.
n.processAppendEntry(aeMsg2, n.aesub)
validateCatchup := func() {
t.Helper()
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.cterm, 20)
require_Equal(t, n.catchup.cindex, 1)
}
validateCatchup()

// Catchup the first missed entry.
csub := n.catchup.sub
n.processAppendEntry(aeMsg1, csub)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.pterm, 20)

// Would previously stall the catchup and restart it with a previous leader.
n.catchup.pindex = aeMsg1.pindex + 1
n.catchup.active = time.Time{}
n.processAppendEntry(aeMsg1Fork, n.aesub)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.pterm, 20)
validateCatchup()

// Would previously reset the WAL.
n.processAppendEntry(aeMsg2Fork, n.aesub)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.pterm, 20)
validateCatchup()

// Now the catchup should continue, undisturbed by an old leader sending append entries.
n.processAppendEntry(aeMsg2, csub)
require_Equal(t, n.pindex, 2)
require_Equal(t, n.pterm, 20)
require_True(t, n.catchup == nil)

// A remaining catchup entry can still be ingested, even if the catchup state itself is gone.
n.processAppendEntry(aeMsg3, csub)
require_Equal(t, n.pindex, 3)
require_Equal(t, n.pterm, 20)
}

// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
// proposing the next one.
// The test may fail if:
Expand Down
Loading