Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 33 additions & 30 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,8 +1120,8 @@ func (n *raft) Applied(index uint64) (entries uint64, bytes uint64) {
if n.applied > n.papplied {
entries = n.applied - n.papplied
}
if n.bytes > 0 {
bytes = entries * n.bytes / (n.pindex - n.papplied)
if msgs := n.pindex - n.papplied; msgs > 0 {
bytes = entries * n.bytes / msgs
}
return entries, bytes
}
Expand Down Expand Up @@ -3345,6 +3345,10 @@ func (n *raft) truncateWAL(term, index uint64) {
if n.papplied > n.applied {
n.papplied = n.applied
}
// Refresh bytes count after truncate.
var state StreamState
n.wal.FastState(&state)
n.bytes = state.Bytes
}()

if err := n.wal.Truncate(index); err != nil {
Expand Down Expand Up @@ -3401,14 +3405,22 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
var scratch [appendEntryResponseLen]byte
arbuf := scratch[:]

// Grab term from append entry. But if leader explicitly defined its term, use that instead.
// This is required during catchup if the leader catches us up on older items from previous terms.
// While still allowing us to confirm they're matching our highest known term.
lterm := ae.term
if ae.lterm != 0 {
lterm = ae.lterm
}

// Are we receiving from another leader.
if n.State() == Leader {
// If we are the same we should step down to break the tie.
if ae.term >= n.term {
if lterm >= n.term {
// If the append entry term is newer than the current term, erase our
// vote.
if ae.term > n.term {
n.term = ae.term
if lterm > n.term {
n.term = lterm
n.vote = noVote
n.writeTermVote()
}
Expand All @@ -3420,10 +3432,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("AppendEntry ignoring old term from another leader")
n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf))
arPool.Put(ar)
n.Unlock()
return
}
// Always return here from processing.
n.Unlock()
return
}

// If we received an append entry as a candidate then it would appear that
Expand All @@ -3432,11 +3443,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if n.State() == Candidate {
// If we have a leader in the current term or higher, we should stepdown,
// write the term and vote if the term of the request is higher.
if ae.term >= n.term {
if lterm >= n.term {
// If the append entry term is newer than the current term, erase our
// vote.
if ae.term > n.term {
n.term = ae.term
if lterm > n.term {
n.term = lterm
n.vote = noVote
n.writeTermVote()
}
Expand All @@ -3460,9 +3471,9 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}

// If we are catching up ignore old catchup subs.
// If we are/were catching up ignore old catchup subs.
// This could happen when we stall or cancel a catchup.
if !isNew && catchingUp && sub != n.catchup.sub {
if !isNew && sub != nil && (!catchingUp || sub != n.catchup.sub) {
n.Unlock()
n.debug("AppendEntry ignoring old entry from previous catchup")
return
Expand Down Expand Up @@ -3495,14 +3506,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
}
}

// Grab term from append entry. But if leader explicitly defined its term, use that instead.
// This is required during catchup if the leader catches us up on older items from previous terms.
// While still allowing us to confirm they're matching our highest known term.
lterm := ae.term
if ae.lterm != 0 {
lterm = ae.lterm
}

// If this term is greater than ours.
if lterm > n.term {
n.term = lterm
Expand Down Expand Up @@ -3543,7 +3546,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
// Check if this is a lower or equal index than what we were expecting.
if ae.pindex <= n.pindex {
n.debug("AppendEntry detected pindex less than/equal to ours: [%d:%d] vs [%d:%d]", ae.pterm, ae.pindex, n.pterm, n.pindex)
var ar *appendEntryResponse
var success bool

if ae.pindex < n.commit {
Expand All @@ -3552,10 +3554,10 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
n.debug("AppendEntry pindex %d below commit %d, marking success", ae.pindex, n.commit)
} else if eae, _ := n.loadEntry(ae.pindex); eae == nil {
// If terms are equal, and we are not catching up, we have simply already processed this message.
// So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && !catchingUp {
// This can happen on server restarts based on timings of snapshots.
if ae.pterm == n.pterm && isNew {
success = true
n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex, n.commit)
n.debug("AppendEntry pindex %d already processed, marking success", ae.pindex)
} else if ae.pindex == n.pindex {
// Check if only our terms do not match here.
// Make sure pterms match and we take on the leader's.
Expand Down Expand Up @@ -3593,12 +3595,12 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
if !success {
n.cancelCatchup()
}

// Create response.
ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success)
// Intentionally not responding. Otherwise, we could erroneously report "success". Reporting
// non-success is not needed either, and would only be wasting messages.
// For example, if we got partial catchup, and then the "real-time" messages came in very delayed.
// If we reported "success" on those "real-time" messages, we'd wrongfully be providing
// quorum while not having an up-to-date log.
n.Unlock()
n.sendRPC(ae.reply, _EMPTY_, ar.encode(arbuf))
arPool.Put(ar)
return
}

Expand Down Expand Up @@ -3753,6 +3755,7 @@ CONTINUE:

// Only ever respond to new entries.
// Never respond to catchup messages, because providing quorum based on this is unsafe.
// The only way for the leader to receive "success" MUST be through this path.
var ar *appendEntryResponse
if sub != nil && isNew {
ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, true)
Expand Down
113 changes: 113 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2347,6 +2347,7 @@ func TestNRGCatchupDontCountTowardQuorum(t *testing.T) {
sub, err := nc.SubscribeSync(aeReply)
require_NoError(t, err)
defer sub.Drain()
require_NoError(t, nc.Flush())

// Timeline
aeMissedMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply})
Expand Down Expand Up @@ -2812,6 +2813,7 @@ func TestNRGInitializeAndScaleUp(t *testing.T) {
sub, err := nc.SubscribeSync(voteReply)
require_NoError(t, err)
defer sub.Drain()
require_NoError(t, nc.Flush())

// Votes on a new leader, and resets notion of "empty vote" to ease getting quorum.
require_NoError(t, n.processVoteRequest(&voteRequest{term: 1, candidate: nats0, reply: voteReply}))
Expand Down Expand Up @@ -3045,6 +3047,117 @@ func TestNRGSizeAndApplied(t *testing.T) {
require_Equal(t, bytes, 0)
}

func TestNRGIgnoreEntryAfterCanceledCatchup(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})

n.processAppendEntry(aeMsg2, n.aesub)
require_True(t, n.catchup != nil)

csub := n.catchup.sub
n.cancelCatchup()

// Catchup was canceled, a message on this canceled catchup should not be stored.
n.processAppendEntry(aeMsg1, csub)
require_Equal(t, n.pindex, 0)
}

func TestNRGDelayedMessagesAfterCatchupDontCountTowardQuorum(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

aeReply := "$TEST"
nats0 := "S1Nunr6R" // "nats-0"
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply})
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries, reply: aeReply})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries, reply: aeReply})

// Triggers catchup.
n.processAppendEntry(aeMsg3, n.aesub)
require_True(t, n.catchup != nil)

// Catchup runs partially.
n.processAppendEntry(aeMsg1, n.catchup.sub)
n.processAppendEntry(aeMsg2, n.catchup.sub)
require_Equal(t, n.pindex, 2)
require_Equal(t, n.commit, 1)
n.Applied(1)
require_Equal(t, n.applied, 1)
require_NoError(t, n.InstallSnapshot(nil))

nc, err := nats.Connect(n.s.ClientURL(), nats.UserInfo("admin", "s3cr3t!"))
require_NoError(t, err)
defer nc.Close()

sub, err := nc.SubscribeSync(aeReply)
require_NoError(t, err)
defer sub.Drain()
require_NoError(t, nc.Flush())

// We now receive delayed "real-time" messages.
// The first message needs to be a copy, because we've committed it before and returned it to the pool.
aeMsg1Copy := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries, reply: aeReply})
n.processAppendEntry(aeMsg1Copy, n.aesub)
require_Equal(t, n.pindex, 2)
// Should NOT reply "success", otherwise we would wrongfully provide quorum while not having an up-to-date log.
_, err = sub.NextMsg(500 * time.Millisecond)
require_Error(t, err, nats.ErrTimeout)

n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.pindex, 2)
// Should NOT reply "success", otherwise we would wrongfully provide quorum while not having an up-to-date log.
_, err = sub.NextMsg(500 * time.Millisecond)
require_Error(t, err, nats.ErrTimeout)

n.processAppendEntry(aeMsg3, n.aesub)
require_Equal(t, n.pindex, 3)
// Should reply "success", this is the latest message.
msg, err := sub.NextMsg(500 * time.Millisecond)
require_NoError(t, err)
ar := n.decodeAppendEntryResponse(msg.Data)
require_Equal(t, ar.index, 3)
require_True(t, ar.success)
require_Equal(t, msg.Reply, _EMPTY_)
}

func TestNRGStepdownWithHighestTermDuringCatchup(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats0 := "S1Nunr6R" // "nats-0"
aeMsg1 := encode(t, &appendEntry{leader: nats0, lterm: 10, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeMsg2 := encode(t, &appendEntry{leader: nats0, lterm: 20, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})

// Need to store the message, stepdown, and up term.
n.switchToCandidate()
require_Equal(t, n.term, 1)
n.processAppendEntry(aeMsg1, n.aesub)
require_Equal(t, n.term, 10)
require_Equal(t, n.pindex, 1)

// Need to store the message, stepdown, and up term.
n.switchToLeader()
n.processAppendEntry(aeMsg2, n.aesub)
require_Equal(t, n.term, 20)
require_Equal(t, n.pindex, 2)
}

// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
// proposing the next one.
// The test may fail if:
Expand Down
Loading