diff --git a/server/raft.go b/server/raft.go index 9e8bff9af01..4ac669a9b39 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1204,7 +1204,7 @@ func (n *raft) InstallSnapshot(data []byte) error { return errNoSnapAvailable } - n.debug("Installing snapshot of %d bytes", len(data)) + n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied) return n.installSnapshot(&snapshot{ lastTerm: term, @@ -2708,7 +2708,7 @@ func (n *raft) loadFirstEntry() (ae *appendEntry, err error) { func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64]) { n.RLock() s, reply := n.s, n.areply - peer, subj, term, last := ar.peer, ar.reply, n.term, n.pindex + peer, subj, term, pterm, last := ar.peer, ar.reply, n.term, n.pterm, n.pindex n.RUnlock() defer s.grWG.Done() @@ -2730,7 +2730,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64 indexUpdatesQ.unregister() }() - n.debug("Running catchup for %q", peer) + n.debug("Running catchup for %q [%d:%d] to [%d:%d]", peer, ar.term, ar.index, pterm, last) const maxOutstanding = 2 * 1024 * 1024 // 2MB for now. next, total, om := uint64(0), 0, make(map[uint64]int) @@ -3545,29 +3545,39 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { - n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) + n.debug("AppendEntry detected pindex less than/equal to ours: [%d:%d] vs [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex) var ar *appendEntryResponse var success bool if ae.pindex < n.commit { // If we have already committed this entry, just mark success. success = true + n.debug("AppendEntry pindex %d below commit %d, marking success", ae.pindex, n.commit) } else if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. if ae.pterm == n.pterm && !catchingUp { success = true + n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex, n.commit) } else if ae.pindex == n.pindex { // Check if only our terms do not match here. // Make sure pterms match and we take on the leader's. // This prevents constant spinning. n.truncateWAL(ae.pterm, ae.pindex) - } else if ae.pindex == n.applied { - // Entry can't be found, this is normal because we have a snapshot at this index. - // Truncate back to where we've created the snapshot. - n.truncateWAL(ae.pterm, ae.pindex) } else { - n.resetWAL() + snap, err := n.loadLastSnapshot() + if err == nil && snap.lastIndex == ae.pindex && snap.lastTerm == ae.pterm { + // Entry can't be found, this is normal because we have a snapshot at this index. + // Truncate back to where we've created the snapshot. + n.truncateWAL(snap.lastTerm, snap.lastIndex) + // Only continue if truncation was successful, and we ended up such that we can safely continue. + if ae.pterm == n.pterm && ae.pindex == n.pindex { + goto CONTINUE + } + } else { + // Otherwise, something has gone very wrong and we need to reset. + n.resetWAL() + } } } else if eae.term == ae.pterm { // If terms match we can delete all entries past this one, and then continue storing the current entry. @@ -3645,7 +3655,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } // Setup our state for catching up. - n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) + n.debug("AppendEntry did not match [%d:%d] with [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex) inbox := n.createCatchup(ae) ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) n.Unlock() diff --git a/server/raft_test.go b/server/raft_test.go index b15345dd36d..9a7aa13b551 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -2883,6 +2883,103 @@ func TestNRGInitializeAndScaleUp(t *testing.T) { require_False(t, vr.empty) } +func TestNRGReplayOnSnapshotSameTerm(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + + // Process the first append entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.pindex, 1) + + // Commit and apply. + require_NoError(t, n.applyCommit(1)) + require_Equal(t, n.commit, 1) + n.Applied(1) + require_Equal(t, n.applied, 1) + + // Install snapshot. + require_NoError(t, n.InstallSnapshot(nil)) + snap, err := n.loadLastSnapshot() + require_NoError(t, err) + require_Equal(t, snap.lastIndex, 1) + + // Process other messages. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.pindex, 3) + + // Replay the append entry that matches our snapshot. + // This can happen as a repeated entry, or a delayed append entry after having already received it in a catchup. + // Should be recognized as a replay with the same term, marked as success, and not truncate. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 3) +} + +func TestNRGReplayOnSnapshotDifferentTerm(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries, lterm: 2}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries, lterm: 2}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 2, pindex: 2, entries: entries, lterm: 2}) + + // Process the first append entry. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.pindex, 1) + + // Commit and apply. + require_NoError(t, n.applyCommit(1)) + require_Equal(t, n.commit, 1) + n.Applied(1) + require_Equal(t, n.applied, 1) + + // Install snapshot. + require_NoError(t, n.InstallSnapshot(nil)) + snap, err := n.loadLastSnapshot() + require_NoError(t, err) + require_Equal(t, snap.lastIndex, 1) + + // Reset applied to simulate having received the snapshot from + // another leader, and we didn't apply yet since it's async. + n.applied = 0 + + // Process other messages. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.pindex, 3) + + // Replay the append entry that matches our snapshot. + // This can happen as a repeated entry, or a delayed append entry after having already received it in a catchup. + // Should be recognized as truncating back to the installed snapshot, not reset the WAL fully. + // Since all is aligned after truncation, should also be able to apply the entry. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + + // Should now also be able to apply the third entry. + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.pindex, 3) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: