Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1204,7 +1204,7 @@ func (n *raft) InstallSnapshot(data []byte) error {
return errNoSnapAvailable
}

n.debug("Installing snapshot of %d bytes", len(data))
n.debug("Installing snapshot of %d bytes [%d:%d]", len(data), term, n.applied)

return n.installSnapshot(&snapshot{
lastTerm: term,
Expand Down Expand Up @@ -2708,7 +2708,7 @@ func (n *raft) loadFirstEntry() (ae *appendEntry, err error) {
func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64]) {
n.RLock()
s, reply := n.s, n.areply
peer, subj, term, last := ar.peer, ar.reply, n.term, n.pindex
peer, subj, term, pterm, last := ar.peer, ar.reply, n.term, n.pterm, n.pindex
n.RUnlock()

defer s.grWG.Done()
Expand All @@ -2730,7 +2730,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64
indexUpdatesQ.unregister()
}()

n.debug("Running catchup for %q", peer)
n.debug("Running catchup for %q [%d:%d] to [%d:%d]", peer, ar.term, ar.index, pterm, last)

const maxOutstanding = 2 * 1024 * 1024 // 2MB for now.
next, total, om := uint64(0), 0, make(map[uint64]int)
Expand Down Expand Up @@ -3545,29 +3545,39 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if ae.pterm != n.pterm || ae.pindex != n.pindex {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
n.debug("AppendEntry detected pindex less than/equal to ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex)
n.debug("AppendEntry detected pindex less than/equal to ours: [%d:%d] vs [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse
var success bool

if ae.pindex < n.commit {
// If we have already committed this entry, just mark success.
success = true
n.debug("AppendEntry pindex %d below commit %d, marking success", ae.pindex, n.commit)
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
success = true
n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex, n.commit)
} else if ae.pindex == n.pindex {
// Check if only our terms do not match here.
// Make sure pterms match and we take on the leader's.
// This prevents constant spinning.
n.truncateWAL(ae.pterm, ae.pindex)
} else if ae.pindex == n.applied {
// Entry can't be found, this is normal because we have a snapshot at this index.
// Truncate back to where we've created the snapshot.
n.truncateWAL(ae.pterm, ae.pindex)
} else {
n.resetWAL()
snap, err := n.loadLastSnapshot()
if err == nil && snap.lastIndex == ae.pindex && snap.lastTerm == ae.pterm {
// Entry can't be found, this is normal because we have a snapshot at this index.
// Truncate back to where we've created the snapshot.
n.truncateWAL(snap.lastTerm, snap.lastIndex)
// Only continue if truncation was successful, and we ended up such that we can safely continue.
if ae.pterm == n.pterm && ae.pindex == n.pindex {
goto CONTINUE
}
} else {
// Otherwise, something has gone very wrong and we need to reset.
n.resetWAL()
}
}
} else if eae.term == ae.pterm {
// If terms match we can delete all entries past this one, and then continue storing the current entry.
Expand Down Expand Up @@ -3645,7 +3655,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}

// Setup our state for catching up.
n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex)
n.debug("AppendEntry did not match [%d:%d] with [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex)
inbox := n.createCatchup(ae)
ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false)
n.Unlock()
Expand Down
97 changes: 97 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2883,6 +2883,103 @@ func TestNRGInitializeAndScaleUp(t *testing.T) {
require_False(t, vr.empty)
}

func TestNRGReplayOnSnapshotSameTerm(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries})

// Process the first append entry.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.pindex, 1)

// Commit and apply.
require_NoError(t, n.applyCommit(1))
require_Equal(t, n.commit, 1)
n.Applied(1)
require_Equal(t, n.applied, 1)

// Install snapshot.
require_NoError(t, n.InstallSnapshot(nil))
snap, err := n.loadLastSnapshot()
require_NoError(t, err)
require_Equal(t, snap.lastIndex, 1)

// Process other messages.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.pindex, 2)
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.pindex, 3)

// Replay the append entry that matches our snapshot.
// This can happen as a repeated entry, or a delayed append entry after having already received it in a catchup.
// Should be recognized as a replay with the same term, marked as success, and not truncate.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.pindex, 3)
}

func TestNRGReplayOnSnapshotDifferentTerm(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"

// Timeline
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 0, pindex: 0, entries: entries, lterm: 2})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 1, pindex: 1, entries: entries, lterm: 2})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 1, pterm: 2, pindex: 2, entries: entries, lterm: 2})

// Process the first append entry.
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.pindex, 1)

// Commit and apply.
require_NoError(t, n.applyCommit(1))
require_Equal(t, n.commit, 1)
n.Applied(1)
require_Equal(t, n.applied, 1)

// Install snapshot.
require_NoError(t, n.InstallSnapshot(nil))
snap, err := n.loadLastSnapshot()
require_NoError(t, err)
require_Equal(t, snap.lastIndex, 1)

// Reset applied to simulate having received the snapshot from
// another leader, and we didn't apply yet since it's async.
n.applied = 0

// Process other messages.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.pindex, 2)
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.pindex, 3)

// Replay the append entry that matches our snapshot.
// This can happen as a repeated entry, or a delayed append entry after having already received it in a catchup.
// Should be recognized as truncating back to the installed snapshot, not reset the WAL fully.
// Since all is aligned after truncation, should also be able to apply the entry.
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.pindex, 2)

// Should now also be able to apply the third entry.
n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.pindex, 3)
}

// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
// proposing the next one.
// The test may fail if:
Expand Down