Skip to content

Commit

Permalink
Merge pull request #1310 from aaronlehmann/raft-stuck-on-startup
Browse files Browse the repository at this point in the history
raft: Fix infinite election loop
  • Loading branch information
LK4D4 authored Aug 4, 2016
2 parents a210cdd + 7e70556 commit af23e13
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 18 deletions.
82 changes: 64 additions & 18 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ type Node struct {
StateDir string
Error error

raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
wasLeader bool
restored bool
isMember uint32
joinAddr string
raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
restored bool
signalledLeadership uint32
isMember uint32
joinAddr string

// waitProp waits for all the proposals to be terminated before
// shutting down the node.
Expand Down Expand Up @@ -329,6 +329,8 @@ func (n *Node) Run(ctx context.Context) error {
close(n.doneCh)
}()

wasLeader := false

for {
select {
case <-n.ticker.C():
Expand Down Expand Up @@ -389,12 +391,23 @@ func (n *Node) Run(ctx context.Context) error {
// if that happens we will apply them as any
// follower would.
if rd.SoftState != nil {
if n.wasLeader && rd.SoftState.RaftState != raft.StateLeader {
n.wasLeader = false
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
n.wait.cancelAll()
n.leadershipBroadcast.Write(IsFollower)
} else if !n.wasLeader && rd.SoftState.RaftState == raft.StateLeader {
n.wasLeader = true
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Write(IsFollower)
}
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
wasLeader = true
}
}

if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
if n.caughtUp() {
atomic.StoreUint32(&n.signalledLeadership, 1)
n.leadershipBroadcast.Write(IsLeader)
}
}
Expand Down Expand Up @@ -506,6 +519,19 @@ func (n *Node) Leader() uint64 {
return n.Node.Status().Lead
}

// ReadyForProposals returns true if the node has broadcasted a message
// saying that it has become the leader. This means it is ready to accept
// proposals.
func (n *Node) ReadyForProposals() bool {
return atomic.LoadUint32(&n.signalledLeadership) == 1
}

func (n *Node) caughtUp() bool {
// obnoxious function that always returns a nil error
lastIndex, _ := n.raftStore.LastIndex()
return n.appliedIndex >= lastIndex
}

// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
Expand Down Expand Up @@ -719,12 +745,24 @@ func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg == nil || msg.Message == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no message provided")
}

// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
return nil, ErrMemberRemoved
}

if msg.Message.Type == raftpb.MsgProp {
// We don't accepted forwarded proposals. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted")
}

// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
Expand Down Expand Up @@ -982,6 +1020,14 @@ func (n *Node) send(messages []raftpb.Message) error {
continue
}

if m.Type == raftpb.MsgProp {
// We don't forward proposals to the leader. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
continue
}

n.asyncTasks.Add(1)
go n.sendToMember(members, m)
}
Expand Down Expand Up @@ -1093,7 +1139,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
ch := n.wait.register(r.ID, cb)

// Do this check after calling register to avoid a race.
if !n.IsLeader() {
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
Expand Down
9 changes: 9 additions & 0 deletions manager/state/raft/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func WaitForCluster(t *testing.T, clockSource *fakeclock.FakeClock, nodes map[ui
if cur.Lead != prev.Lead || cur.Term != prev.Term || cur.Applied != prev.Applied {
return errors.New("state does not match on all nodes")
}
if !n2.ReadyForProposals() {
return errors.New("leader not ready")
}
continue nodeLoop
}
}
Expand Down Expand Up @@ -254,8 +257,14 @@ func NewInitNode(t *testing.T, tc *cautils.TestCA, raftConfig *api.RaftConfig, o
err := n.Node.JoinAndStart()
require.NoError(t, err, "can't join cluster")

leadershipCh, cancel := n.SubscribeLeadership()
defer cancel()

go n.Run(ctx)

// Wait for the node to become the leader.
<-leadershipCh

if raftConfig != nil {
assert.NoError(t, n.MemoryStore().Update(func(tx store.Tx) error {
return store.CreateCluster(tx, &api.Cluster{
Expand Down

0 comments on commit af23e13

Please sign in to comment.