Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,11 +1422,6 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
return true
}

// Check here on catchup status.
if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex {
n.cancelCatchup()
}

// Check to see that we have heard from the current leader lately.
if n.leader != noLeader && n.leader != n.id && n.catchup == nil {
okInterval := int64(hbInterval) * 2
Expand All @@ -1437,7 +1432,9 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
}
}
if cs := n.catchup; cs != nil {
// We're actively catching up, can't mark current even if commit==applied.
n.debug("Not current, still catching up pindex=%d, cindex=%d", n.pindex, cs.cindex)
return false
}

if n.commit == n.applied {
Expand Down
141 changes: 141 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,3 +1700,144 @@ func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) {
require_NoError(t, err)
require_Len(t, len(files), 0)
}

func TestNRGHealthCheckWaitForCatchup(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries})
aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil})

// Switch follower into catchup.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 0) // n.pterm
require_Equal(t, n.catchup.pindex, 0) // n.pindex
require_Equal(t, n.catchup.cterm, aeHeartbeat.term)
require_Equal(t, n.catchup.cindex, aeHeartbeat.pindex)

// Catchup first message.
n.processAppendEntry(aeMsg1, n.catchup.sub)
require_Equal(t, n.pindex, 1)
require_False(t, n.Healthy())

// Catchup second message.
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_Equal(t, n.pindex, 2)
require_Equal(t, n.commit, 1)
require_False(t, n.Healthy())

// If we apply the entry sooner than we receive the next catchup message,
// should not mark as healthy since we're still in catchup.
n.Applied(1)
require_False(t, n.Healthy())

// Catchup third message.
n.processAppendEntry(aeMsg3, n.catchup.sub)
require_Equal(t, n.pindex, 3)
require_Equal(t, n.commit, 2)
n.Applied(2)
require_False(t, n.Healthy())

// Heartbeat stops catchup.
n.processAppendEntry(aeHeartbeat, n.aesub)
require_True(t, n.catchup == nil)
require_Equal(t, n.pindex, 3)
require_Equal(t, n.commit, 3)
require_False(t, n.Healthy())

// Still need to wait for the last entry to be applied.
n.Applied(3)
require_True(t, n.Healthy())
}

func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil})

// Switch follower into catchup.
n.processAppendEntry(aeHeartbeat1, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 0) // n.pterm
require_Equal(t, n.catchup.pindex, 0) // n.pindex
require_Equal(t, n.catchup.cterm, aeHeartbeat1.term)
require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex)

// Catchup first message.
n.processAppendEntry(aeMsg1, n.catchup.sub)
require_Equal(t, n.pindex, 1)
require_False(t, n.Healthy())

// We miss this message, since we're catching up.
n.processAppendEntry(aeMsg3, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.pindex, 1)
require_False(t, n.Healthy())

// We also miss the heartbeat, since we're catching up.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.pindex, 1)
require_False(t, n.Healthy())

// Catchup second message, this will stop catchup.
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_Equal(t, n.pindex, 2)
require_Equal(t, n.commit, 1)
n.Applied(1)
require_False(t, n.Healthy())

// We expect to still be in catchup, waiting for a heartbeat or new append entry to reset.
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.cterm, aeHeartbeat1.term)
require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex)

// We now get a 'future' heartbeat, should restart catchup.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_True(t, n.catchup != nil)
require_Equal(t, n.catchup.pterm, 1) // n.pterm
require_Equal(t, n.catchup.pindex, 2) // n.pindex
require_Equal(t, n.catchup.cterm, aeHeartbeat2.term)
require_Equal(t, n.catchup.cindex, aeHeartbeat2.pindex)
require_False(t, n.Healthy())

// Catchup third message.
n.processAppendEntry(aeMsg3, n.catchup.sub)
require_Equal(t, n.pindex, 3)
require_Equal(t, n.commit, 2)
n.Applied(2)
require_False(t, n.Healthy())

// Heartbeat stops catchup.
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_True(t, n.catchup == nil)
require_Equal(t, n.pindex, 3)
require_Equal(t, n.commit, 3)
require_False(t, n.Healthy())

// Still need to wait for the last entry to be applied.
n.Applied(3)
require_True(t, n.Healthy())
}