diff --git a/server/raft.go b/server/raft.go index 296a9df378f..e3e4f622839 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1120,8 +1120,8 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) { if n.applied > n.papplied { entries = n.applied - n.papplied } - if n.bytes > 0 { - bytes = entries * n.bytes / (n.pindex - n.papplied) + if msgs := n.pindex - n.papplied; msgs > 0 { + bytes = entries * n.bytes / msgs } return entries, bytes } @@ -3345,6 +3345,10 @@ func (n *raft) truncateWAL(term, index uint64) { if n.papplied > n.applied { n.papplied = n.applied } + // Refresh bytes count after truncate. + var state StreamState + n.wal.FastState(&state) + n.bytes = state.Bytes }() if err := n.wal.Truncate(index); err != nil { @@ -3401,14 +3405,22 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { var scratch [appendEntryResponseLen]byte arbuf := scratch[:] + // Grab term from append entry. But if leader explicitly defined its term, use that instead. + // This is required during catchup if the leader catches us up on older items from previous terms. + // While still allowing us to confirm they're matching our highest known term. + lterm := ae.term + if ae.lterm != 0 { + lterm = ae.lterm + } + // Are we receiving from another leader. if n.State() == Leader { // If we are the same we should step down to break the tie. - if ae.term >= n.term { + if lterm >= n.term { // If the append entry term is newer than the current term, erase our // vote. - if ae.term > n.term { - n.term = ae.term + if lterm > n.term { + n.term = lterm n.vote = noVote n.writeTermVote() } @@ -3420,10 +3432,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.debug("AppendEntry ignoring old term from another leader") n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) arPool.Put(ar) + n.Unlock() + return } - // Always return here from processing. - n.Unlock() - return } // If we received an append entry as a candidate then it would appear that @@ -3432,11 +3443,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if n.State() == Candidate { // If we have a leader in the current term or higher, we should stepdown, // write the term and vote if the term of the request is higher. - if ae.term >= n.term { + if lterm >= n.term { // If the append entry term is newer than the current term, erase our // vote. - if ae.term > n.term { - n.term = ae.term + if lterm > n.term { + n.term = lterm n.vote = noVote n.writeTermVote() } @@ -3460,9 +3471,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } - // If we are catching up ignore old catchup subs. + // If we are/were catching up ignore old catchup subs. // This could happen when we stall or cancel a catchup. - if !isNew && catchingUp && sub != n.catchup.sub { + if !isNew && sub != nil && (!catchingUp || sub != n.catchup.sub) { n.Unlock() n.debug("AppendEntry ignoring old entry from previous catchup") return @@ -3495,14 +3506,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } } - // Grab term from append entry. But if leader explicitly defined its term, use that instead. - // This is required during catchup if the leader catches us up on older items from previous terms. - // While still allowing us to confirm they're matching our highest known term. - lterm := ae.term - if ae.lterm != 0 { - lterm = ae.lterm - } - // If this term is greater than ours. if lterm > n.term { n.term = lterm @@ -3543,7 +3546,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // Check if this is a lower or equal index than what we were expecting. if ae.pindex <= n.pindex { n.debug("AppendEntry detected pindex less than/equal to ours: [%d:%d] vs [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex) - var ar *appendEntryResponse var success bool if ae.pindex < n.commit { @@ -3552,10 +3554,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { n.debug("AppendEntry pindex %d below commit %d, marking success", ae.pindex, n.commit) } else if eae, _ := n.loadEntry(ae.pindex); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. - // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. - if ae.pterm == n.pterm && !catchingUp { + // This can happen on server restarts based on timings of snapshots. + if ae.pterm == n.pterm && isNew { success = true - n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex, n.commit) + n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex) } else if ae.pindex == n.pindex { // Check if only our terms do not match here. // Make sure pterms match and we take on the leader's. @@ -3593,12 +3595,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if !success { n.cancelCatchup() } - - // Create response. - ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success) + // Intentionally not responding. Otherwise, we could erroneously report "success". Reporting + // non-success is not needed either, and would only be wasting messages. + // For example, if we got partial catchup, and then the "real-time" messages came in very delayed. + // If we reported "success" on those "real-time" messages, we'd wrongfully be providing + // quorum while not having an up-to-date log. n.Unlock() - n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf)) - arPool.Put(ar) return } @@ -3753,6 +3755,7 @@ CONTINUE: // Only ever respond to new entries. // Never respond to catchup messages, because providing quorum based on this is unsafe. + // The only way for the leader to receive "success" MUST be through this path. var ar *appendEntryResponse if sub != nil && isNew { ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true) diff --git a/server/raft_test.go b/server/raft_test.go index b5b45282e35..d6a9359b28f 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -2347,6 +2347,7 @@ func TestNRGCatchupDontCountTowardQuorum(t *testing.T) { sub, err := nc.SubscribeSync(aeReply) require_NoError(t, err) defer sub.Drain() + require_NoError(t, nc.Flush()) // Timeline aeMissedMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply}) @@ -2812,6 +2813,7 @@ func TestNRGInitializeAndScaleUp(t *testing.T) { sub, err := nc.SubscribeSync(voteReply) require_NoError(t, err) defer sub.Drain() + require_NoError(t, nc.Flush()) // Votes on a new leader, and resets notion of "empty vote" to ease getting quorum. require_NoError(t, n.processVoteRequest(&voteRequest{term: 1, candidate: nats0, reply: voteReply})) @@ -3045,6 +3047,117 @@ func TestNRGSizeAndApplied(t *testing.T) { require_Equal(t, bytes, 0) } +func TestNRGIgnoreEntryAfterCanceledCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + + n.processAppendEntry(aeMsg2, n.aesub) + require_True(t, n.catchup != nil) + + csub := n.catchup.sub + n.cancelCatchup() + + // Catchup was canceled, a message on this canceled catchup should not be stored. + n.processAppendEntry(aeMsg1, csub) + require_Equal(t, n.pindex, 0) +} + +func TestNRGDelayedMessagesAfterCatchupDontCountTowardQuorum(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + aeReply := "$TEST" + nats0 := "S1Nunr6R" // "nats-0" + aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries, reply: aeReply}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries, reply: aeReply}) + + // Triggers catchup. + n.processAppendEntry(aeMsg3, n.aesub) + require_True(t, n.catchup != nil) + + // Catchup runs partially. + n.processAppendEntry(aeMsg1, n.catchup.sub) + n.processAppendEntry(aeMsg2, n.catchup.sub) + require_Equal(t, n.pindex, 2) + require_Equal(t, n.commit, 1) + n.Applied(1) + require_Equal(t, n.applied, 1) + require_NoError(t, n.InstallSnapshot(nil)) + + nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!")) + require_NoError(t, err) + defer nc.Close() + + sub, err := nc.SubscribeSync(aeReply) + require_NoError(t, err) + defer sub.Drain() + require_NoError(t, nc.Flush()) + + // We now receive delayed "real-time" messages. + // The first message needs to be a copy, because we've committed it before and returned it to the pool. + aeMsg1Copy := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply}) + n.processAppendEntry(aeMsg1Copy, n.aesub) + require_Equal(t, n.pindex, 2) + // Should NOT reply "success", otherwise we would wrongfully provide quorum while not having an up-to-date log. + _, err = sub.NextMsg(500 * time.Millisecond) + require_Error(t, err, nats.ErrTimeout) + + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.pindex, 2) + // Should NOT reply "success", otherwise we would wrongfully provide quorum while not having an up-to-date log. + _, err = sub.NextMsg(500 * time.Millisecond) + require_Error(t, err, nats.ErrTimeout) + + n.processAppendEntry(aeMsg3, n.aesub) + require_Equal(t, n.pindex, 3) + // Should reply "success", this is the latest message. + msg, err := sub.NextMsg(500 * time.Millisecond) + require_NoError(t, err) + ar := n.decodeAppendEntryResponse(msg.Data) + require_Equal(t, ar.index, 3) + require_True(t, ar.success) + require_Equal(t, msg.Reply, _EMPTY_) +} + +func TestNRGStepdownWithHighestTermDuringCatchup(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + aeMsg1 := encode(t, &appendEntry{leader: nats0, lterm: 10, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeMsg2 := encode(t, &appendEntry{leader: nats0, lterm: 20, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries}) + + // Need to store the message, stepdown, and up term. + n.switchToCandidate() + require_Equal(t, n.term, 1) + n.processAppendEntry(aeMsg1, n.aesub) + require_Equal(t, n.term, 10) + require_Equal(t, n.pindex, 1) + + // Need to store the message, stepdown, and up term. + n.switchToLeader() + n.processAppendEntry(aeMsg2, n.aesub) + require_Equal(t, n.term, 20) + require_Equal(t, n.pindex, 2) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: