Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 9 additions & 30 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2652,9 +2652,6 @@ func (n *raft) runAsLeader() {
n.unsubscribe(rpsub)
n.Unlock()
}()

// To send out our initial peer state.
n.sendPeerState()
n.Unlock()

hb := time.NewTicker(hbInterval)
Expand Down Expand Up @@ -4627,37 +4624,19 @@ func (n *raft) switchToLeader() {
}

n.Lock()
defer n.Unlock()

n.debug("Switching to leader")

// Check if we have items pending as we are taking over.
sendHB := n.pindex > n.commit

n.lxfer = false
n.updateLeader(n.id)
leadChange := n.switchState(Leader)

if leadChange {
// Wait for messages to be applied if we've stored more, otherwise signal immediately.
// It's important to wait signaling we're leader if we're not up-to-date yet, as that
// would mean we're in a consistent state compared with the previous leader.
if n.pindex > n.applied {
n.aflr = n.pindex
} else {
// We know we have applied all entries in our log and can signal immediately.
// For sanity reset applied floor back down to 0, so we aren't able to signal twice.
n.aflr = 0
if !n.leaderState.Swap(true) {
// Only update timestamp if leader state actually changed.
nowts := time.Now().UTC()
n.leaderSince.Store(&nowts)
}
n.updateLeadChange(true)
}
}
n.Unlock()
n.switchState(Leader)

if sendHB {
n.sendHeartbeat()
}
// To send out our initial peer state.
// In our implementation this is equivalent to sending a NOOP-entry upon becoming leader.
// Wait for this message (and potentially more) to be applied.
// It's important to wait signaling we're leader if we're not up-to-date yet, as that
// would mean we're in a consistent state compared with the previous leader.
n.sendPeerState()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably only want this to happen if n.switchState() returns success?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is necessary. We only call n.switchToLeader if we previously ran as candidate and won the vote. That means n.switchState(Leader) will always return true. Not sure why I added the if leadChange before.

n.aflr = n.pindex
}
89 changes: 50 additions & 39 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,11 +1570,11 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) {
aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries})

// Timeline, we temporarily became leader
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: nil})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: entries})
aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: nil})
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: entries})

// Timeline, old leader is back.
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 2, pterm: 1, pindex: 2, entries: nil})
aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 3, pterm: 1, pindex: 3, entries: nil})

// Simply receive first message.
n.processAppendEntry(aeMsg1, n.aesub)
Expand Down Expand Up @@ -1604,10 +1604,10 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) {
reply: _EMPTY_,
success: true,
})
require_Equal(t, n.commit, 2)
require_Equal(t, n.commit, 3)

// Simulate upper layer calling down to apply.
n.Applied(2)
n.Applied(3)

// Install snapshot and check it exists.
err = n.InstallSnapshot(nil)
Expand All @@ -1621,18 +1621,18 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) {

// Store a third message, it stays uncommitted.
require_NoError(t, n.storeToWAL(aeMsg3))
require_Equal(t, n.commit, 2)
require_Equal(t, n.commit, 3)
require_Equal(t, n.wal.State().Msgs, 1)
entry, err = n.loadEntry(3)
entry, err = n.loadEntry(4)
require_NoError(t, err)
require_Equal(t, entry.leader, nats0)

// Receive heartbeat from new leader, should not lose commits.
n.stepdown(noLeader)
n.processAppendEntry(aeHeartbeat2, n.aesub)
require_Equal(t, n.wal.State().Msgs, 0)
require_Equal(t, n.commit, 2)
require_Equal(t, n.applied, 2)
require_Equal(t, n.commit, 3)
require_Equal(t, n.applied, 3)
}

func TestNRGIgnoreDoubleSnapshot(t *testing.T) {
Expand Down Expand Up @@ -2212,7 +2212,7 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) {
require_True(t, n.Healthy())
}

func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) {
func TestNRGAppendEntryCanEstablishQuorumAfterLeaderChange(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

Expand All @@ -2224,7 +2224,7 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) {

// Timeline
aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 1, peer: nats0, success: true}
aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 2, peer: nats0, success: true}

// Process first message.
n.processAppendEntry(aeMsg, n.aesub)
Expand All @@ -2234,27 +2234,23 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) {
// Simulate becoming leader, and not knowing if the stored entry has quorum and can be committed.
// Switching to leader should send a heartbeat.
n.switchToLeader()
require_Equal(t, n.aflr, 1)
require_Equal(t, n.aflr, 2)
require_Equal(t, n.commit, 0)

// We simulate receiving the successful heartbeat response here. It should move the commit up.
n.processAppendEntryResponse(aeHeartbeatResponse)
require_Equal(t, n.commit, 1)
require_Equal(t, n.aflr, 1)
require_Equal(t, n.commit, 2)
require_Equal(t, n.aflr, 2)

// Once the entry is applied, it should reset the applied floor.
n.Applied(1)
n.Applied(2)
require_Equal(t, n.aflr, 0)
}

func TestNRGQuorumAccounting(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats1 := "yrzKKRBu" // "nats-1"
nats2 := "cnrtt3eg" // "nats-2"

Expand All @@ -2267,10 +2263,8 @@ func TestNRGQuorumAccounting(t *testing.T) {
require_Equal(t, n.csz, 5)
require_Equal(t, n.qn, 3)

// Switch this node to leader, and send an entry.
// Switch this node to leader which sends an entry.
n.switchToLeader()
require_Equal(t, n.pindex, 0)
n.sendAppendEntry(entries)
require_Equal(t, n.pindex, 1)

// The first response MUST NOT indicate quorum has been reached.
Expand All @@ -2286,10 +2280,6 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

nats1 := "yrzKKRBu" // "nats-1"
nats2 := "cnrtt3eg" // "nats-2"

Expand All @@ -2302,12 +2292,10 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) {
require_Equal(t, n.csz, 5)
require_Equal(t, n.qn, 3)

// Switch this node to leader, and send an entry.
// Switch this node to leader which sends an entry.
n.term++
n.switchToLeader()
require_Equal(t, n.term, 1)
require_Equal(t, n.pindex, 0)
n.sendAppendEntry(entries)
require_Equal(t, n.pindex, 1)

// We have one server that signals the message was stored. The leader will add 1 to the acks count.
Expand Down Expand Up @@ -2354,10 +2342,10 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) {
n.switchToCandidate()
n.switchToLeader()
select {
case isLeader := <-n.LeadChangeC():
require_True(t, isLeader)
case <-n.LeadChangeC():
t.Error("Expected no leadChange signal")
default:
t.Error("Expected leadChange signal")
// Expecting no signal yet.
}
},
},
Expand Down Expand Up @@ -2460,16 +2448,10 @@ func TestNRGIgnoreTrackResponseWhenNotLeader(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

// Create a sample entry, the content doesn't matter, just that it's stored.
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}

// Switch this node to leader, and send two entries. The first will get quorum, the second will not.
// Switch this node to leader which sends an entry.
n.term++
n.switchToLeader()
require_Equal(t, n.term, 1)
require_Equal(t, n.pindex, 0)
n.sendAppendEntry(entries)
require_Equal(t, n.pindex, 1)
require_Equal(t, n.pterm, 1)
require_Equal(t, n.commit, 0)
Expand Down Expand Up @@ -3753,6 +3735,35 @@ func TestNRGParallelCatchupRollback(t *testing.T) {
require_Equal(t, n.pindex, 2)
}

func TestNRGReportLeaderAfterNoopEntry(t *testing.T) {
n, cleanup := initSingleMemRaftNode(t)
defer cleanup()

require_Equal(t, n.State(), Follower)
require_Equal(t, n.term, 0)
require_False(t, n.Leader())

n.switchToCandidate()
require_Equal(t, n.State(), Candidate)
require_Equal(t, n.term, 1)
require_False(t, n.Leader())

// Switching to leader will put us into Leader state,
// but we're not necessarily an up-to-date leader yet.
n.switchToLeader()
require_Equal(t, n.State(), Leader)
require_Equal(t, n.term, 1)
require_Equal(t, n.pindex, 1) // Should've sent a NOOP-entry to establish leadership.
require_Equal(t, n.applied, 0)
require_False(t, n.Leader())

// Once we commit and apply the final entry, we should starting to report we're leader.
n.commit = 1
n.Applied(1)
require_Equal(t, n.applied, 1)
require_True(t, n.Leader())
}

// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
// proposing the next one.
// The test may fail if:
Expand Down