diff --git a/server/raft.go b/server/raft.go index 7ea1ee54c85..163d3e6e061 100644 --- a/server/raft.go +++ b/server/raft.go @@ -2652,9 +2652,6 @@ func (n *raft) runAsLeader() { n.unsubscribe(rpsub) n.Unlock() }() - - // To send out our initial peer state. - n.sendPeerState() n.Unlock() hb := time.NewTicker(hbInterval) @@ -4627,37 +4624,19 @@ func (n *raft) switchToLeader() { } n.Lock() + defer n.Unlock() n.debug("Switching to leader") - // Check if we have items pending as we are taking over. - sendHB := n.pindex > n.commit - n.lxfer = false n.updateLeader(n.id) - leadChange := n.switchState(Leader) - - if leadChange { - // Wait for messages to be applied if we've stored more, otherwise signal immediately. - // It's important to wait signaling we're leader if we're not up-to-date yet, as that - // would mean we're in a consistent state compared with the previous leader. - if n.pindex > n.applied { - n.aflr = n.pindex - } else { - // We know we have applied all entries in our log and can signal immediately. - // For sanity reset applied floor back down to 0, so we aren't able to signal twice. - n.aflr = 0 - if !n.leaderState.Swap(true) { - // Only update timestamp if leader state actually changed. - nowts := time.Now().UTC() - n.leaderSince.Store(&nowts) - } - n.updateLeadChange(true) - } - } - n.Unlock() + n.switchState(Leader) - if sendHB { - n.sendHeartbeat() - } + // To send out our initial peer state. + // In our implementation this is equivalent to sending a NOOP-entry upon becoming leader. + // Wait for this message (and potentially more) to be applied. + // It's important to wait signaling we're leader if we're not up-to-date yet, as that + // would mean we're in a consistent state compared with the previous leader. + n.sendPeerState() + n.aflr = n.pindex } diff --git a/server/raft_test.go b/server/raft_test.go index e202237c9b3..2ae33c73b6f 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -1570,11 +1570,11 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { aeMsg2 := encode(t, &appendEntry{leader: nats1, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) // Timeline, we temporarily became leader - aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: nil}) - aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 2, pterm: 1, pindex: 2, entries: entries}) + aeHeartbeat1 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: nil}) + aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 2, commit: 3, pterm: 1, pindex: 3, entries: entries}) // Timeline, old leader is back. - aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 2, pterm: 1, pindex: 2, entries: nil}) + aeHeartbeat2 := encode(t, &appendEntry{leader: nats1, term: 3, commit: 3, pterm: 1, pindex: 3, entries: nil}) // Simply receive first message. n.processAppendEntry(aeMsg1, n.aesub) @@ -1604,10 +1604,10 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { reply: _EMPTY_, success: true, }) - require_Equal(t, n.commit, 2) + require_Equal(t, n.commit, 3) // Simulate upper layer calling down to apply. - n.Applied(2) + n.Applied(3) // Install snapshot and check it exists. err = n.InstallSnapshot(nil) @@ -1621,9 +1621,9 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { // Store a third message, it stays uncommitted. require_NoError(t, n.storeToWAL(aeMsg3)) - require_Equal(t, n.commit, 2) + require_Equal(t, n.commit, 3) require_Equal(t, n.wal.State().Msgs, 1) - entry, err = n.loadEntry(3) + entry, err = n.loadEntry(4) require_NoError(t, err) require_Equal(t, entry.leader, nats0) @@ -1631,8 +1631,8 @@ func TestNRGSnapshotAndTruncateToApplied(t *testing.T) { n.stepdown(noLeader) n.processAppendEntry(aeHeartbeat2, n.aesub) require_Equal(t, n.wal.State().Msgs, 0) - require_Equal(t, n.commit, 2) - require_Equal(t, n.applied, 2) + require_Equal(t, n.commit, 3) + require_Equal(t, n.applied, 3) } func TestNRGIgnoreDoubleSnapshot(t *testing.T) { @@ -2212,7 +2212,7 @@ func TestNRGHealthCheckWaitForPendingCommitsWhenPaused(t *testing.T) { require_True(t, n.Healthy()) } -func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { +func TestNRGAppendEntryCanEstablishQuorumAfterLeaderChange(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() @@ -2224,7 +2224,7 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { // Timeline aeMsg := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) - aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 1, peer: nats0, success: true} + aeHeartbeatResponse := &appendEntryResponse{term: 1, index: 2, peer: nats0, success: true} // Process first message. n.processAppendEntry(aeMsg, n.aesub) @@ -2234,16 +2234,16 @@ func TestNRGHeartbeatCanEstablishQuorumAfterLeaderChange(t *testing.T) { // Simulate becoming leader, and not knowing if the stored entry has quorum and can be committed. // Switching to leader should send a heartbeat. n.switchToLeader() - require_Equal(t, n.aflr, 1) + require_Equal(t, n.aflr, 2) require_Equal(t, n.commit, 0) // We simulate receiving the successful heartbeat response here. It should move the commit up. n.processAppendEntryResponse(aeHeartbeatResponse) - require_Equal(t, n.commit, 1) - require_Equal(t, n.aflr, 1) + require_Equal(t, n.commit, 2) + require_Equal(t, n.aflr, 2) // Once the entry is applied, it should reset the applied floor. - n.Applied(1) + n.Applied(2) require_Equal(t, n.aflr, 0) } @@ -2251,10 +2251,6 @@ func TestNRGQuorumAccounting(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - nats1 := "yrzKKRBu" // "nats-1" nats2 := "cnrtt3eg" // "nats-2" @@ -2267,10 +2263,8 @@ func TestNRGQuorumAccounting(t *testing.T) { require_Equal(t, n.csz, 5) require_Equal(t, n.qn, 3) - // Switch this node to leader, and send an entry. + // Switch this node to leader which sends an entry. n.switchToLeader() - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) // The first response MUST NOT indicate quorum has been reached. @@ -2286,10 +2280,6 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - nats1 := "yrzKKRBu" // "nats-1" nats2 := "cnrtt3eg" // "nats-2" @@ -2302,12 +2292,10 @@ func TestNRGRevalidateQuorumAfterLeaderChange(t *testing.T) { require_Equal(t, n.csz, 5) require_Equal(t, n.qn, 3) - // Switch this node to leader, and send an entry. + // Switch this node to leader which sends an entry. n.term++ n.switchToLeader() require_Equal(t, n.term, 1) - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) // We have one server that signals the message was stored. The leader will add 1 to the acks count. @@ -2354,10 +2342,10 @@ func TestNRGSignalLeadChangeFalseIfCampaignImmediately(t *testing.T) { n.switchToCandidate() n.switchToLeader() select { - case isLeader := <-n.LeadChangeC(): - require_True(t, isLeader) + case <-n.LeadChangeC(): + t.Error("Expected no leadChange signal") default: - t.Error("Expected leadChange signal") + // Expecting no signal yet. } }, }, @@ -2460,16 +2448,10 @@ func TestNRGIgnoreTrackResponseWhenNotLeader(t *testing.T) { n, cleanup := initSingleMemRaftNode(t) defer cleanup() - // Create a sample entry, the content doesn't matter, just that it's stored. - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - - // Switch this node to leader, and send two entries. The first will get quorum, the second will not. + // Switch this node to leader which sends an entry. n.term++ n.switchToLeader() require_Equal(t, n.term, 1) - require_Equal(t, n.pindex, 0) - n.sendAppendEntry(entries) require_Equal(t, n.pindex, 1) require_Equal(t, n.pterm, 1) require_Equal(t, n.commit, 0) @@ -3753,6 +3735,35 @@ func TestNRGParallelCatchupRollback(t *testing.T) { require_Equal(t, n.pindex, 2) } +func TestNRGReportLeaderAfterNoopEntry(t *testing.T) { + n, cleanup := initSingleMemRaftNode(t) + defer cleanup() + + require_Equal(t, n.State(), Follower) + require_Equal(t, n.term, 0) + require_False(t, n.Leader()) + + n.switchToCandidate() + require_Equal(t, n.State(), Candidate) + require_Equal(t, n.term, 1) + require_False(t, n.Leader()) + + // Switching to leader will put us into Leader state, + // but we're not necessarily an up-to-date leader yet. + n.switchToLeader() + require_Equal(t, n.State(), Leader) + require_Equal(t, n.term, 1) + require_Equal(t, n.pindex, 1) // Should've sent a NOOP-entry to establish leadership. + require_Equal(t, n.applied, 0) + require_False(t, n.Leader()) + + // Once we commit and apply the final entry, we should starting to report we're leader. + n.commit = 1 + n.Applied(1) + require_Equal(t, n.applied, 1) + require_True(t, n.Leader()) +} + // This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before // proposing the next one. // The test may fail if: