diff --git a/server/raft.go b/server/raft.go index 6577a60cb36..cbcf9b9e4d6 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3356,6 +3356,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.updateLeadChange(false) } +RETRY: if ae.pterm != n.pterm || ae.pindex != n.pindex { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { @@ -3363,16 +3364,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { var ar *appendEntryResponse var success bool - if n.commit > 0 && ae.pindex <= n.commit { - // Check if only our terms do not match here. - if ae.pindex == n.pindex { - // Make sure pterms match and we take on the leader's. - // This prevents constant spinning. - n.truncateWAL(ae.pterm, ae.pindex) - } else { - // If we have already committed this entry, just mark success. - success = true - } + if ae.pindex < n.commit { + // If we have already committed this entry, just mark success. + success = true } else if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. @@ -3386,6 +3380,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } else { n.resetWAL() } + } else if eae.term == ae.pterm { + // If terms match we can delete all entries past this one, and then continue storing the current entry. + n.truncateWAL(eae.term, eae.pindex+1) + goto RETRY } else { // If terms mismatched, delete that entry and all others past it. // Make sure to cancel any catchups in progress. diff --git a/server/raft_test.go b/server/raft_test.go index be26deae95d..5af8429436c 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1274,8 +1274,8 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) // Timeline. aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) - aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: nil}) aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) aeHeartbeat2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 3, pterm: 1, pindex: 3, entries: nil}) @@ -1286,10 +1286,6 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) require_NoError(t, err) require_Equal(t, entry.leader, nats0) - // Heartbeat, makes sure commit moves up. - n.processAppendEntry(aeHeartbeat1, n.aesub) - require_Equal(t, n.commit, 1) - // Deliver a message. n.processAppendEntry(aeMsg2, n.aesub) require_Equal(t, n.wal.State().Msgs, 2) @@ -1297,6 +1293,10 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) require_NoError(t, err) require_Equal(t, entry.leader, nats0) + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat1, n.aesub) + require_Equal(t, n.commit, 2) + // Deliver another message. n.processAppendEntry(aeMsg3, n.aesub) require_Equal(t, n.wal.State().Msgs, 3) @@ -1306,49 +1306,13 @@ func TestNRGCatchupDoesNotTruncateCommittedEntriesDuringRedelivery(t *testing.T) // Simulate receiving an old entry as a redelivery. We should not truncate as that lowers our commit. n.processAppendEntry(aeMsg1, n.aesub) - require_Equal(t, n.commit, 1) + require_Equal(t, n.commit, 2) // Heartbeat, makes sure we commit. n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.commit, 3) } -func TestNRGNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) { - n, cleanup := initSingleMemRaftNode(t) - defer cleanup() - - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - - nats0 := "S1Nunr6R" // "nats-0" - nats1 := "yrzKKRBu" // "nats-1" - - // Timeline, first leader. - aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) - aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) - - // Timeline, leader changed, but pterm got set to term. - aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: nil}) - - // Initial case is simple, just store the entry. - n.processAppendEntry(aeMsg1, n.aesub) - require_Equal(t, n.wal.State().Msgs, 1) - entry, err := n.loadEntry(1) - require_NoError(t, err) - require_Equal(t, entry.leader, nats0) - - // Heartbeat, makes sure commit moves up. - n.processAppendEntry(aeHeartbeat1, n.aesub) - require_Equal(t, n.commit, 1) - require_Equal(t, n.pterm, 1) - - // Heartbeat from another leader, pterm got set to term, make sure to only up our pterm. - n.processAppendEntry(aeHeartbeat2, n.aesub) - require_Equal(t, n.commit, 1) - require_Equal(t, n.pterm, 2) -} - func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() @@ -1395,68 +1359,6 @@ func TestNRGCatchupFromNewLeaderWithIncorrectPterm(t *testing.T) { require_Equal(t, n.commit, 1) } -func TestNRGCatchupFromNewLeaderWithIncorrectPtermDoesNotTruncateIfCommitted(t *testing.T) { - n, cleanup := initSingleMemRaftNode(t) - defer cleanup() - - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - - nats0 := "S1Nunr6R" // "nats-0" - nats1 := "yrzKKRBu" // "nats-1" - - // Timeline, first leader. - aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) - aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) - - // Timeline, leader changed, but pterm got set to term. - aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 1, entries: entries}) - aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil}) - - // Initial case is simple, just store the entry. - n.processAppendEntry(aeMsg1, n.aesub) - require_Equal(t, n.wal.State().Msgs, 1) - entry, err := n.loadEntry(1) - require_NoError(t, err) - require_Equal(t, entry.leader, nats0) - - // Heartbeat, makes sure commit moves up. - n.processAppendEntry(aeHeartbeat1, n.aesub) - require_Equal(t, n.commit, 1) - require_Equal(t, n.pterm, 1) - - // Heartbeat from another leader, we missed a message so we need catchup. - n.processAppendEntry(aeHeartbeat2, n.aesub) - require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. - require_True(t, n.catchup != nil) - require_Equal(t, n.catchup.pterm, 1) // n.pterm - require_Equal(t, n.catchup.pindex, 1) // n.pindex - - // We get a message with an incorrect pterm, can only correct pterm and requires re-trigger of catchup. - n.processAppendEntry(aeMsg2, n.catchup.sub) - require_True(t, n.catchup == nil) - require_Equal(t, n.pterm, 2) - require_Equal(t, n.pindex, 1) - - // Heartbeat re-triggers catchup. - n.processAppendEntry(aeHeartbeat2, n.aesub) - require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. - require_True(t, n.catchup != nil) - require_Equal(t, n.catchup.pterm, 2) // n.pterm - require_Equal(t, n.catchup.pindex, 1) // n.pindex - - // Now we get the message again and can continue to store it. - n.processAppendEntry(aeMsg2, n.catchup.sub) - require_Equal(t, n.wal.State().Msgs, 2) - require_True(t, n.catchup != nil) - - // Heartbeat can now cancel catchup and move up our commit. - n.processAppendEntry(aeHeartbeat2, n.aesub) - require_Equal(t, n.commit, 2) - require_True(t, n.catchup == nil) -} - func TestNRGDontRemoveSnapshotIfTruncateToApplied(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() @@ -1667,3 +1569,52 @@ func TestNRGMultipleStopsDontPanic(t *testing.T) { n.Stop() } } + +func TestNRGTruncateDownToCommitted(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, we are leader + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + + // Timeline, after leader change + aeMsg3 := encode(t, &appendEntry{leader: nats1, term: 2, commit: 0, pterm: 1, pindex: 1, entries: entries}) + aeHeartbeat := encode(t, &appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 2, entries: nil}) + + // Simply receive first message. + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.commit, 0) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Receive second message, which commits the first message. + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We receive an entry from another leader, should truncate down to commit / remove the second message. + // After doing so, we should also be able to immediately store the message after. + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.commit, 1) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Heartbeat moves commit up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 2) +}