Skip to content

Commit

Permalink
Merge pull request #10683 from tbg/prs
Browse files Browse the repository at this point in the history
raft: extract progress tracking into own component
  • Loading branch information
tbg authored May 22, 2019
2 parents 416a539 + 5dd4501 commit c38e965
Show file tree
Hide file tree
Showing 10 changed files with 411 additions and 308 deletions.
10 changes: 5 additions & 5 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,15 +353,15 @@ func (n *node) run(r *raft) {
}
case m := <-n.recvc:
// filter out response message from unknown From.
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
if cc.NodeID == None {
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
Nodes: r.prs.voterNodes(),
Learners: r.prs.learnerNodes()}:
case <-n.done:
}
break
Expand All @@ -384,8 +384,8 @@ func (n *node) run(r *raft) {
}
select {
case n.confstatec <- pb.ConfState{
Nodes: r.nodes(),
Learners: r.learnerNodes()}:
Nodes: r.prs.voterNodes(),
Learners: r.prs.learnerNodes()}:
case <-n.done:
}
case <-n.tickc:
Expand Down
177 changes: 176 additions & 1 deletion raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package raft

import "fmt"
import (
"fmt"
"sort"
)

const (
ProgressStateProbe ProgressStateType = iota
Expand Down Expand Up @@ -283,3 +286,175 @@ func (in *inflights) reset() {
in.count = 0
in.start = 0
}

// progressTracker tracks the currently active configuration and the information
// known about the nodes and learners in it. In particular, it tracks the match
// index for each peer which in turn allows reasoning about the committed index.
type progressTracker struct {
nodes map[uint64]*Progress
learners map[uint64]*Progress

votes map[uint64]bool

maxInflight int
matchBuf uint64Slice
}

func makePRS(maxInflight int) progressTracker {
p := progressTracker{
maxInflight: maxInflight,
nodes: map[uint64]*Progress{},
learners: map[uint64]*Progress{},
votes: map[uint64]bool{},
}
return p
}

// isSingleton returns true if (and only if) there is only one voting member
// (i.e. the leader) in the current configuration.
func (p *progressTracker) isSingleton() bool {
return len(p.nodes) == 1
}

func (p *progressTracker) quorum() int {
return len(p.nodes)/2 + 1
}

func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool {
return len(m) >= p.quorum()
}

// committed returns the largest log index known to be committed based on what
// the voting members of the group have acknowledged.
func (p *progressTracker) committed() uint64 {
// Preserving matchBuf across calls is an optimization
// used to avoid allocating a new slice on each call.
if cap(p.matchBuf) < len(p.nodes) {
p.matchBuf = make(uint64Slice, len(p.nodes))
}
p.matchBuf = p.matchBuf[:len(p.nodes)]
idx := 0
for _, pr := range p.nodes {
p.matchBuf[idx] = pr.Match
idx++
}
sort.Sort(&p.matchBuf)
return p.matchBuf[len(p.matchBuf)-p.quorum()]
}

func (p *progressTracker) removeAny(id uint64) {
pN := p.nodes[id]
pL := p.learners[id]

if pN == nil && pL == nil {
panic("attempting to remove unknown peer %x")
} else if pN != nil && pL != nil {
panic(fmt.Sprintf("peer %x is both voter and learner", id))
}

delete(p.nodes, id)
delete(p.learners, id)
}

// initProgress initializes a new progress for the given node or learner. The
// node may not exist yet in either form or a panic will ensue.
func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
if pr := p.nodes[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
}
if pr := p.learners[id]; pr != nil {
panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr))
}
if !isLearner {
p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
return
}
p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
}

func (p *progressTracker) getProgress(id uint64) *Progress {
if pr, ok := p.nodes[id]; ok {
return pr
}

return p.learners[id]
}

// visit invokes the supplied closure for all tracked progresses.
func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
for id, pr := range p.nodes {
f(id, pr)
}

for id, pr := range p.learners {
f(id, pr)
}
}

// checkQuorumActive returns true if the quorum is active from
// the view of the local raft state machine. Otherwise, it returns
// false.
func (p *progressTracker) quorumActive() bool {
var act int
p.visit(func(id uint64, pr *Progress) {
if pr.RecentActive && !pr.IsLearner {
act++
}
})

return act >= p.quorum()
}

func (p *progressTracker) voterNodes() []uint64 {
nodes := make([]uint64, 0, len(p.nodes))
for id := range p.nodes {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

func (p *progressTracker) learnerNodes() []uint64 {
nodes := make([]uint64, 0, len(p.learners))
for id := range p.learners {
nodes = append(nodes, id)
}
sort.Sort(uint64Slice(nodes))
return nodes
}

// resetVotes prepares for a new round of vote counting via recordVote.
func (p *progressTracker) resetVotes() {
p.votes = map[uint64]bool{}
}

// recordVote records that the node with the given id voted for this Raft
// instance if v == true (and declined it otherwise).
func (p *progressTracker) recordVote(id uint64, v bool) {
_, ok := p.votes[id]
if !ok {
p.votes[id] = v
}
}

// tallyVotes returns the number of granted and rejected votes, and whether the
// election outcome is known.
func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) {
for _, v := range p.votes {
if v {
granted++
} else {
rejected++
}
}

q := p.quorum()

result = electionIndeterminate
if granted >= q {
result = electionWon
} else if rejected >= q {
result = electionLost
}
return granted, rejected, result
}
Loading

0 comments on commit c38e965

Please sign in to comment.