Skip to content

Commit

Permalink
API to disallow a node to candidate
Browse files Browse the repository at this point in the history
  • Loading branch information
rledisez committed Feb 20, 2024
1 parent bc7d147 commit 88f0fc5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
18 changes: 18 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ type Raft struct {

// mainThreadSaturation measures the saturation of the main raft goroutine.
mainThreadSaturation *saturationMetric

// allowCandidating is used in situation where the node must no become a
// leader in case of election.
allowCandidating bool
allowCandidatingLock sync.Mutex
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -560,6 +565,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
leaderNotifyCh: make(chan struct{}, 1),
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
allowCandidating: true,
}

r.conf.Store(*conf)
Expand Down Expand Up @@ -1261,3 +1267,15 @@ func (r *Raft) LeadershipTransferToServer(id ServerID, address ServerAddress) Fu

return r.initiateLeadershipTransfer(&id, &address)
}

func (r *Raft) SetAllowCandidating(v bool) {
r.allowCandidatingLock.Lock()
defer r.allowCandidatingLock.Unlock()
r.allowCandidating = v
}

func (r *Raft) GetAllowCandidating() bool {
r.allowCandidatingLock.Lock()
defer r.allowCandidatingLock.Unlock()
return r.allowCandidating
}
16 changes: 10 additions & 6 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ func (r *Raft) runFollower() {
}
} else {
metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1)
if hasVote(r.configurations.latest, r.localID) {
if hasVote(r.configurations.latest, r.localID) && r.GetAllowCandidating() {
r.logger.Warn("heartbeat timeout reached, starting election", "last-leader-addr", lastLeaderAddr, "last-leader-id", lastLeaderID)
r.setState(Candidate)
return
} else if !didWarn {
r.logger.Warn("heartbeat timeout reached, not part of a stable configuration or a non-voter, not triggering a leader election")
r.logger.Warn("heartbeat timeout reached, not part of a stable configuration or a non-voter or not allowed to candidate, not triggering a leader election")
didWarn = true
}
}
Expand Down Expand Up @@ -1988,10 +1988,14 @@ func (r *Raft) initiateLeadershipTransfer(id *ServerID, address *ServerAddress)

// timeoutNow is what happens when a server receives a TimeoutNowRequest.
func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.setLeader("", "")
r.setState(Candidate)
r.candidateFromLeadershipTransfer.Store(true)
rpc.Respond(&TimeoutNowResponse{}, nil)
if r.GetAllowCandidating() {
r.setLeader("", "")
r.setState(Candidate)
r.candidateFromLeadershipTransfer.Store(true)
rpc.Respond(&TimeoutNowResponse{}, nil)
} else {
rpc.Respond(nil, fmt.Errorf("not allowed to candidate"))
}
}

// setLatestConfiguration stores the latest configuration and updates a copy of it.
Expand Down

0 comments on commit 88f0fc5

Please sign in to comment.