From deb6765c1ff364726583d0c4e3ae03a9996edd04 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Thu, 12 Dec 2024 10:22:54 +0100 Subject: [PATCH] NRG: Don't mark current/healthy while catching up Signed-off-by: Maurice van Veen --- server/raft.go | 7 +-- server/raft_test.go | 141 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 5 deletions(-) diff --git a/server/raft.go b/server/raft.go index 9153c147ee0..48b9db2c453 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1422,11 +1422,6 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { return true } - // Check here on catchup status. - if cs := n.catchup; cs != nil && n.pterm >= cs.cterm && n.pindex >= cs.cindex { - n.cancelCatchup() - } - // Check to see that we have heard from the current leader lately. if n.leader != noLeader && n.leader != n.id && n.catchup == nil { okInterval := int64(hbInterval) * 2 @@ -1437,7 +1432,9 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool { } } if cs := n.catchup; cs != nil { + // We're actively catching up, can't mark current even if commit==applied. n.debug("Not current, still catching up pindex=%d, cindex=%d", n.pindex, cs.cindex) + return false } if n.commit == n.applied { diff --git a/server/raft_test.go b/server/raft_test.go index 3e1aea062c2..54f0cd825ba 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1700,3 +1700,144 @@ func TestNRGMemoryWALEmptiesSnapshotsDir(t *testing.T) { require_NoError(t, err) require_Len(t, len(files), 0) } + +func TestNRGHealthCheckWaitForCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + + // Switch follower into catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat.pindex) + + // Catchup first message. + n.processAppendEntry(aeMsg1, n.catchup.sub) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // Catchup second message. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + require_False(t, n.Healthy()) + + // If we apply the entry sooner than we receive the next catchup message, + // should not mark as healthy since we're still in catchup. + n.Applied(1) + require_False(t, n.Healthy()) + + // Catchup third message. + n.processAppendEntry(aeMsg3, n.catchup.sub) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 2) + n.Applied(2) + require_False(t, n.Healthy()) + + // Heartbeat stops catchup. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_False(t, n.Healthy()) + + // Still need to wait for the last entry to be applied. + n.Applied(3) + require_True(t, n.Healthy()) +} + +func TestNRGHealthCheckWaitForDoubleCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) + + // Switch follower into catchup. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 0) // n.pterm + require_Equal(t, n.catchup.pindex, 0) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat1.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex) + + // Catchup first message. + n.processAppendEntry(aeMsg1, n.catchup.sub) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // We miss this message, since we're catching up. + n.processAppendEntry(aeMsg3, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // We also miss the heartbeat, since we're catching up. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.pindex, 1) + require_False(t, n.Healthy()) + + // Catchup second message, this will stop catchup. + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + n.Applied(1) + require_False(t, n.Healthy()) + + // We expect to still be in catchup, waiting for a heartbeat or new append entry to reset. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.cterm, aeHeartbeat1.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat1.pindex) + + // We now get a 'future' heartbeat, should restart catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 2) // n.pindex + require_Equal(t, n.catchup.cterm, aeHeartbeat2.term) + require_Equal(t, n.catchup.cindex, aeHeartbeat2.pindex) + require_False(t, n.Healthy()) + + // Catchup third message. + n.processAppendEntry(aeMsg3, n.catchup.sub) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 2) + n.Applied(2) + require_False(t, n.Healthy()) + + // Heartbeat stops catchup. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_True(t, n.catchup == nil) + require_Equal(t, n.pindex, 3) + require_Equal(t, n.commit, 3) + require_False(t, n.Healthy()) + + // Still need to wait for the last entry to be applied. + n.Applied(3) + require_True(t, n.Healthy()) +}