Skip to content

Commit

Permalink
raft: wait for cluster readiness on each added node
Browse files Browse the repository at this point in the history
Somehow concurrent join creates deadlock; raft event loop stops with
state:
node 1: term 2, leader 1
node 2: term 2, leader 1
node 3: term 2, leader 3

Also:
* changed polling to less cpu-expensive
* changed back raft tick because on slow machines it leads to
  reelections

Signed-off-by: Alexander Morozov <[email protected]>
  • Loading branch information
LK4D4 committed Mar 18, 2016
1 parent c616227 commit 92a3aa6
Showing 1 changed file with 44 additions and 42 deletions.
86 changes: 44 additions & 42 deletions manager/state/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"net"
"os"
"sync"
"testing"
"time"

Expand All @@ -20,7 +19,7 @@ import (
"github.com/stretchr/testify/assert"
)

const testTick = 50 * time.Millisecond
const testTick = 1 * time.Second

var (
raftLogger = &raft.DefaultLogger{Logger: log.New(ioutil.Discard, "", 0)}
Expand All @@ -30,68 +29,73 @@ func init() {
grpclog.SetLogger(log.New(ioutil.Discard, "", log.LstdFlags))
}

func pollNodeFunc(nodes map[uint64]*Node, f func(*Node) bool) error {
var tickers []*time.Ticker
var wg sync.WaitGroup
for _, n := range nodes {
wg.Add(1)
if f(n) {
wg.Done()
continue
}
tick := time.NewTicker(testTick)
tickers = append(tickers, tick)
go func(n *Node) {
for range tick.C {
if f(n) {
break
}
}
wg.Done()
}(n)
func pollFunc(f func() bool) error {
if f() {
return nil
}
tick := time.NewTicker(100 * time.Millisecond)
defer tick.Stop()
done := make(chan struct{})
go func() {
wg.Wait()
for range tick.C {
if f() {
break
}
}
close(done)
}()
select {
case <-done:
return nil
case <-time.After(32 * time.Second):
for _, tick := range tickers {
tick.Stop()
}
return fmt.Errorf("polling failed")
}
}

func getTermAndLeader(n *Node) (uint64, uint64) {
type leaderStatus struct {
leader uint64
term uint64
}

func getTermAndLeader(n *Node) *leaderStatus {
status := n.Status()
return status.Lead, status.Term
return &leaderStatus{leader: status.Lead, term: status.Term}
}

// waitForCluster waits until leader will be one of specified nodes
func waitForCluster(t *testing.T, nodes map[uint64]*Node) {
assert.NoError(t, pollNodeFunc(nodes, func(n *Node) bool {
l, t := getTermAndLeader(n)
if _, ok := nodes[l]; !ok {
return false
}
for _, node := range nodes {
rl, rt := getTermAndLeader(node)
if rl != l || rt != t {
err := pollFunc(func() bool {
var prev *leaderStatus
for _, n := range nodes {
if prev == nil {
prev = getTermAndLeader(n)
if _, ok := nodes[prev.leader]; !ok {
return false
}
continue
}
cur := getTermAndLeader(n)
if _, ok := nodes[cur.leader]; !ok {
return false
}
if cur.leader != prev.leader || cur.term != prev.term {
return false
}
}
return true
}))
})
assert.NoError(t, err)
}

// waitForPeerNumber waits until peers in cluster converge to specified number
func waitForPeerNumber(t *testing.T, nodes map[uint64]*Node, count int) {
assert.NoError(t, pollNodeFunc(nodes, func(n *Node) bool {
return len(n.Cluster.Peers()) == count
assert.NoError(t, pollFunc(func() bool {
for _, n := range nodes {
if len(n.Cluster.Peers()) != count {
return false
}
}
return true
}))
}

Expand Down Expand Up @@ -183,16 +187,14 @@ func newJoinNode(t *testing.T, id uint64, join string) *Node {
// After stopping, we should receive an error from Serve
assert.Error(t, <-done)
}()

return n
}

func newRaftCluster(t *testing.T) map[uint64]*Node {
nodes := make(map[uint64]*Node)
nodes[1] = newInitNode(t, 1)
nodes[2] = newJoinNode(t, 2, nodes[1].Listener.Addr().String())
nodes[3] = newJoinNode(t, 3, nodes[1].Listener.Addr().String())
waitForCluster(t, nodes)
addRaftNode(t, nodes)
addRaftNode(t, nodes)
return nodes
}

Expand Down

0 comments on commit 92a3aa6

Please sign in to comment.