diff --git a/server/raft.go b/server/raft.go index dd2f345b423..6567bbff1f2 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1172,9 +1172,12 @@ func (n *raft) InstallSnapshot(data []byte) error { return errNoSnapAvailable } - term := n.pterm + var term uint64 if ae, _ := n.loadEntry(n.applied); ae != nil { term = ae.term + } else { + n.debug("Not snapshotting as entry %d is not available", n.applied) + return errNoSnapAvailable } n.debug("Installing snapshot of %d bytes", len(data)) @@ -3454,6 +3457,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Make sure pterms match and we take on the leader's. // This prevents constant spinning. n.truncateWAL(ae.pterm, ae.pindex) + } else if ae.pindex == n.applied { + // Entry can't be found, this is normal because we have a snapshot at this index. + // Truncate back to where we've created the snapshot. + n.truncateWAL(ae.pterm, ae.pindex) } else { n.resetWAL() } diff --git a/server/raft_test.go b/server/raft_test.go index ec178511386..903cce57379 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1523,6 +1523,136 @@ func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { require_Equal(t, len(files), 1) } +func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats1 := "yrzKKRBu" // "nats-1" + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline, other leader + aeMsg1 := encode(t, &appendEntry{leader: nats1, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + + // Timeline, we temporarily became leader + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: entries}) + + // Timeline, old leader is back. + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 2, pterm: 1, pindex: 2, entries: nil}) + + // Simply receive first message. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.commit, 0) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Receive second message, which commits the first message. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Simulate upper layer calling down to apply. + n.Applied(1) + + // Send heartbeat, which commits the second message. + n.processAppendEntryResponse(&appendEntryResponse{ + term: aeHeartbeat1.term, + index: aeHeartbeat1.pindex, + peer: nats1, + reply: _EMPTY_, + success: true, + }) + require_Equal(t, n.commit, 2) + + // Simulate upper layer calling down to apply. + n.Applied(2) + + // Install snapshot and check it exists. + err = n.InstallSnapshot(nil) + require_NoError(t, err) + + snapshots := path.Join(n.sd, snapshotsDir) + files, err := os.ReadDir(snapshots) + require_NoError(t, err) + require_Equal(t, len(files), 1) + require_Equal(t, n.wal.State().Msgs, 0) + + // Store a third message, it stays uncommitted. + require_NoError(t, n.storeToWAL(aeMsg3)) + require_Equal(t, n.commit, 2) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Receive heartbeat from new leader, should not lose commits. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 0) + require_Equal(t, n.commit, 2) + require_Equal(t, n.applied, 2) +} + +func TestNRGIgnoreDoubleSnapshot(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + + // Timeline + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries}) + + // Simply receive first message. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.pindex, 1) + require_Equal(t, n.commit, 0) + + // Heartbeat moves commit up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + + // Manually call back down to applied. + n.Applied(1) + + // Second message just for upping the pterm. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + require_Equal(t, n.pterm, 2) + require_Equal(t, n.term, 2) + + // Snapshot, and confirm state. + err := n.InstallSnapshot(nil) + require_NoError(t, err) + snap, err := n.loadLastSnapshot() + require_NoError(t, err) + require_Equal(t, snap.lastTerm, 1) + require_Equal(t, snap.lastIndex, 1) + + // Snapshot again, should not overwrite previous snapshot. + err = n.InstallSnapshot(nil) + require_Error(t, err, errNoSnapAvailable) + snap, err = n.loadLastSnapshot() + require_NoError(t, err) + require_Equal(t, snap.lastTerm, 1) + require_Equal(t, snap.lastIndex, 1) +} + func TestNRGDontSwitchToCandidateWithInflightSnapshot(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() @@ -1704,7 +1834,7 @@ func TestNRGTruncateDownToCommitted(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Timeline, after leader change - aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries}) aeHeartbeat := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil}) // Simply receive first message. @@ -1765,7 +1895,7 @@ func TestNRGTruncateDownToCommittedWhenTruncateFails(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Timeline, after leader change - aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Simply receive first message. n.processAppendEntry(aeMsg1, n.aesub)