diff --git a/server/raft.go b/server/raft.go index d25aba38d25..7ea1ee54c85 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3594,9 +3594,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { isNew := sub != nil && sub == n.aesub // If we are/were catching up ignore old catchup subs, but only if catching up from an older server - // that doesn't send the leader term when catching up. We can reject old catchups from newer subs - // later, just by checking the append entry is on the correct term. - if !isNew && sub != nil && ae.lterm == 0 && (!catchingUp || sub != n.catchup.sub) { + // that doesn't send the leader term when catching up or if we would truncate as a result. + // We can reject old catchups from newer subs later, just by checking the append entry is on the correct term. + if !isNew && sub != nil && (ae.lterm == 0 || ae.pindex < n.pindex) && (!catchingUp || sub != n.catchup.sub) { n.Unlock() n.debug("AppendEntry ignoring old entry from previous catchup") return diff --git a/server/raft_test.go b/server/raft_test.go index 86db4fdf7c3..e202237c9b3 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -3696,6 +3696,63 @@ func TestNRGLostQuorum(t *testing.T) { require_False(t, n.lostQuorum()) } +func TestNRGParallelCatchupRollback(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + aeReply := "$TEST" + nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + sub, err := nc.SubscribeSync(aeReply) + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, lterm: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, lterm: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 2, entries: nil, reply: aeReply}) + + // Trigger a catchup. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 0) + require_NotNil(t, n.catchup) + require_Equal(t, n.catchup.cterm, aeMsg2.pterm) + require_Equal(t, n.catchup.cindex, aeMsg2.pindex) + csub := n.catchup.sub + + // Receive the missed messages. + n.processAppendEntry(aeMsg1, csub) + require_Equal(t, n.pindex, 1) + n.processAppendEntry(aeMsg2, csub) + require_Equal(t, n.pindex, 2) + require_True(t, n.catchup == nil) + + // Should respond to the heartbeat and allow the leader to commit. + n.processAppendEntry(aeHeartbeat, n.aesub) + msg, err := sub.NextMsg(time.Second) + require_NoError(t, err) + ar := decodeAppendEntryResponse(msg.Data) + require_NotNil(t, ar) + require_True(t, ar.success) + require_Equal(t, ar.term, aeHeartbeat.term) + require_Equal(t, ar.index, aeHeartbeat.pindex) + + // Now replay a message that was already received as a catchup entry. + // Likely due to running multiple catchups in parallel. + // Since our WAL is already ahead, we should not truncate based on this. + n.processAppendEntry(aeMsg1, csub) + require_Equal(t, n.pindex, 2) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: