Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: send up-to-date commit index in heartbeats #140

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ type raft struct {

// the leader id
lead uint64
// logSynced is true if this node's log is guaranteed to be a prefix of the
// leader's log at this term. Always true for the leader. Always false for a
// candidate. For a follower, becomes true the first time a MsgApp append to
// the log succeeds.
logSynced bool
// leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64
Expand Down Expand Up @@ -667,21 +672,12 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {

// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
// the receiver(follower) might not be matched with the leader
// or it might not have all the committed entries.
// The leader MUST NOT forward the follower's commit to
// an unmatched index.
commit := min(r.trk.Progress[to].Match, r.raftLog.committed)
m := pb.Message{
r.send(pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Commit: r.raftLog.committed,
Context: ctx,
}

r.send(m)
})
}

// bcastAppend sends RPC, with entries to all peers that are not up-to-date
Expand Down Expand Up @@ -763,6 +759,7 @@ func (r *raft) reset(term uint64) {
r.Vote = None
}
r.lead = None
r.logSynced = false

r.electionElapsed = 0
r.heartbeatElapsed = 0
Expand Down Expand Up @@ -908,6 +905,7 @@ func (r *raft) becomeLeader() {
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.logSynced = true // the leader's log is in sync with itself
r.state = StateLeader
// Followers enter replicate mode when they've been successfully probed
// (perhaps after having received a snapshot as a result). The leader is
Expand Down Expand Up @@ -1735,6 +1733,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
return
}
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.logSynced = true // from now on, the log is a prefix of the leader's log
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
return
}
Expand Down Expand Up @@ -1770,7 +1769,11 @@ func (r *raft) handleAppendEntries(m pb.Message) {
}

func (r *raft) handleHeartbeat(m pb.Message) {
r.raftLog.commitTo(m.Commit)
// If our log is not a prefix of the leader's log, it is unsafe to advance the
// commit index, because the entries at this index may mismatch.
if r.logSynced {
r.raftLog.commitTo(min(m.Commit, r.raftLog.lastIndex()))
}
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Expand Down
30 changes: 15 additions & 15 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,11 +1332,18 @@ func TestHandleMsgApp(t *testing.T) {
func TestHandleHeartbeat(t *testing.T) {
commit := uint64(2)
tests := []struct {
m pb.Message
wCommit uint64
m pb.Message
logSynced bool
wCommit uint64
}{
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, true, commit + 1},
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, true, commit}, // do not decrease commit
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, false, commit},

// Increase the commit index only if the log is in sync with the leader.
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, false, commit},
// Do not increase the commit index beyond our log size.
{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 10}, true, commit + 1},
}

for i, tt := range tests {
Expand All @@ -1345,6 +1352,8 @@ func TestHandleHeartbeat(t *testing.T) {
sm := newTestRaft(1, 5, 1, storage)
sm.becomeFollower(2, 2)
sm.raftLog.commitTo(commit)
sm.logSynced = tt.logSynced

sm.handleHeartbeat(tt.m)
if sm.raftLog.committed != tt.wCommit {
t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
Expand Down Expand Up @@ -2690,10 +2699,6 @@ func TestBcastBeat(t *testing.T) {
if len(msgs) != 2 {
t.Fatalf("len(msgs) = %v, want 2", len(msgs))
}
wantCommitMap := map[uint64]uint64{
2: min(sm.raftLog.committed, sm.trk.Progress[2].Match),
3: min(sm.raftLog.committed, sm.trk.Progress[3].Match),
}
for i, m := range msgs {
if m.Type != pb.MsgHeartbeat {
t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat)
Expand All @@ -2704,13 +2709,8 @@ func TestBcastBeat(t *testing.T) {
if m.LogTerm != 0 {
t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
}
if wantCommitMap[m.To] == 0 {
t.Fatalf("#%d: unexpected to %d", i, m.To)
} else {
if m.Commit != wantCommitMap[m.To] {
t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
}
delete(wantCommitMap, m.To)
if m.Commit != sm.raftLog.committed {
t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, sm.raftLog.committed)
}
if len(m.Entries) != 0 {
t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
Expand Down
1 change: 1 addition & 0 deletions rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rawNode.raft.logSynced = true // needed to be able to advance the commit index

for highestApplied := uint64(0); highestApplied != 11; {
rd := rawNode.Ready()
Expand Down
14 changes: 7 additions & 7 deletions testdata/async_storage_writes_append_aba_race.txt
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,16 @@ process-ready 4
----
Ready MustSync=false:
Messages:
4->1 MsgHeartbeat Term:3 Log:0/0
4->2 MsgHeartbeat Term:3 Log:0/0
4->3 MsgHeartbeat Term:3 Log:0/0
4->5 MsgHeartbeat Term:3 Log:0/0
4->6 MsgHeartbeat Term:3 Log:0/0
4->7 MsgHeartbeat Term:3 Log:0/0
4->1 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->2 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->3 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->5 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->6 MsgHeartbeat Term:3 Log:0/0 Commit:11
4->7 MsgHeartbeat Term:3 Log:0/0 Commit:11

deliver-msgs 1
----
4->1 MsgHeartbeat Term:3 Log:0/0
4->1 MsgHeartbeat Term:3 Log:0/0 Commit:11
INFO 1 [term: 2] received a MsgHeartbeat message with higher term from 4 [term: 3]
INFO 1 became follower at term 3

Expand Down
176 changes: 176 additions & 0 deletions testdata/lagging_commit.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# This test demonstrates the effect of delayed commit on a follower node after a
# network hiccup between the leader and this follower.

# Skip logging the boilerplate. Set up a raft group of 3 nodes, and elect node 1
# as the leader. Nodes 2 and 3 are the followers.
log-level none
----
ok

add-nodes 3 voters=(1,2,3) index=10
----
ok

campaign 1
----
ok

stabilize
----
ok

# Propose a couple of entries.
propose 1 data1
----
ok

propose 1 data2
----
ok

process-ready 1
----
ok

# The interesting part starts below.
log-level debug
----
ok

deliver-msgs 2 3
----
1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data1"]
1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "data2"]
1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data1"]
1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "data2"]

process-ready 3
----
Ready MustSync=true:
Entries:
1/12 EntryNormal "data1"
1/13 EntryNormal "data2"
Messages:
3->1 MsgAppResp Term:1 Log:0/12
3->1 MsgAppResp Term:1 Log:0/13

# Suppose there is a network blip which prevents the leader learning that the
# follower 3 has appended the proposed entries to the log.
deliver-msgs drop=(1)
----
dropped: 3->1 MsgAppResp Term:1 Log:0/12
dropped: 3->1 MsgAppResp Term:1 Log:0/13

# In the meantime, the entries are committed, and the leader sends the commit
# index to all the followers.
stabilize 1 2
----
> 2 handling Ready
Ready MustSync=true:
Entries:
1/12 EntryNormal "data1"
1/13 EntryNormal "data2"
Messages:
2->1 MsgAppResp Term:1 Log:0/12
2->1 MsgAppResp Term:1 Log:0/13
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/12
2->1 MsgAppResp Term:1 Log:0/13
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:13
CommittedEntries:
1/12 EntryNormal "data1"
1/13 EntryNormal "data2"
Messages:
1->2 MsgApp Term:1 Log:1/13 Commit:12
1->3 MsgApp Term:1 Log:1/13 Commit:12
1->2 MsgApp Term:1 Log:1/13 Commit:13
1->3 MsgApp Term:1 Log:1/13 Commit:13
> 2 receiving messages
1->2 MsgApp Term:1 Log:1/13 Commit:12
1->2 MsgApp Term:1 Log:1/13 Commit:13
> 2 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:13
CommittedEntries:
1/12 EntryNormal "data1"
1/13 EntryNormal "data2"
Messages:
2->1 MsgAppResp Term:1 Log:0/13
2->1 MsgAppResp Term:1 Log:0/13
> 1 receiving messages
2->1 MsgAppResp Term:1 Log:0/13
2->1 MsgAppResp Term:1 Log:0/13

# The network blip prevents the follower 3 from learning that the previously
# appended entries are now committed.
deliver-msgs drop=(3)
----
dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:12
dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:13

# The network blip ends here.

status 1
----
1: StateReplicate match=13 next=14
2: StateReplicate match=13 next=14
3: StateReplicate match=11 next=14 inflight=2

# The leader still observes that the entries are in-flight to the follower 3,
# since it hasn't heard from it. Nothing triggers updating the follower's
# commit index, so we have to wait up to the full heartbeat interval before
# the leader sends the commit index.
tick-heartbeat 1
----
ok

# However, the leader does not push the real commit index to the follower 3. It
# cuts the commit index at the Progress.Match mark, because it thinks that it is
# unsafe to send a commit index higher than that.
process-ready 1
----
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:13
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:13

# Since the heartbeat message does not bump the follower's commit index, it will
# take another roundtrip with the leader to update it. As such, the total time
# it takes for the follower to learn the commit index is:
#
# delay = HeartbeatInterval + 3/2 * RTT
#
# This is suboptimal. It could have taken HeartbeatInterval + 1/2 * RTT, if the
# leader sent the up-to-date commit index in the heartbeat message.
#
# See https://github.com/etcd-io/raft/issues/138 which aims to fix this.
#
# Now this is fixed!
stabilize 1 3
----
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:13
> 3 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:13
CommittedEntries:
1/12 EntryNormal "data1"
1/13 EntryNormal "data2"
Messages:
3->1 MsgHeartbeatResp Term:1 Log:0/0
> 1 receiving messages
3->1 MsgHeartbeatResp Term:1 Log:0/0
> 1 handling Ready
Ready MustSync=false:
Messages:
1->3 MsgApp Term:1 Log:1/13 Commit:13
> 3 receiving messages
1->3 MsgApp Term:1 Log:1/13 Commit:13
> 3 handling Ready
Ready MustSync=false:
Messages:
3->1 MsgAppResp Term:1 Log:0/13
> 1 receiving messages
3->1 MsgAppResp Term:1 Log:0/13
4 changes: 2 additions & 2 deletions testdata/replicate_pause.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ stabilize 1
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:17

stabilize 2 3
----
> 2 receiving messages
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:17
> 2 handling Ready
Ready MustSync=false:
Messages:
Expand Down
4 changes: 2 additions & 2 deletions testdata/snapshot_succeed_via_app_resp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ process-ready 1
Ready MustSync=false:
Messages:
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11

# Iterate until no more work is done by the new peer. It receives the heartbeat
# and responds.
stabilize 3
----
> 3 receiving messages
1->3 MsgHeartbeat Term:1 Log:0/0
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
INFO 3 [term: 0] received a MsgHeartbeat message with higher term from 1 [term: 1]
INFO 3 became follower at term 1
> 3 handling Ready
Expand Down
Loading