Skip to content

Commit

Permalink
API to disallow a node to candidate
Browse files Browse the repository at this point in the history
  • Loading branch information
rledisez committed Jun 12, 2024
1 parent 4c005ab commit ac6c84d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
18 changes: 18 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ type Raft struct {
// preVoteDisabled control if the pre-vote feature is activated,
// prevote feature is disabled if set to true.
preVoteDisabled bool

// allowCandidating is used in situation where the node must no become a
// leader in case of election.
allowCandidating bool
allowCandidatingLock sync.Mutex
}

// BootstrapCluster initializes a server's storage with the given cluster
Expand Down Expand Up @@ -566,6 +571,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
followerNotifyCh: make(chan struct{}, 1),
mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second),
preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote,
allowCandidating: true,
}
if !transportSupportPreVote && !conf.PreVoteDisabled {
r.logger.Warn("pre-vote is disabled because it is not supported by the Transport")
Expand Down Expand Up @@ -1270,3 +1276,15 @@ func (r *Raft) LeadershipTransferToServer(id ServerID, address ServerAddress) Fu

return r.initiateLeadershipTransfer(&id, &address)
}

func (r *Raft) SetAllowCandidating(v bool) {
r.allowCandidatingLock.Lock()
defer r.allowCandidatingLock.Unlock()
r.allowCandidating = v
}

func (r *Raft) GetAllowCandidating() bool {
r.allowCandidatingLock.Lock()
defer r.allowCandidatingLock.Unlock()
return r.allowCandidating
}
16 changes: 10 additions & 6 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ func (r *Raft) runFollower() {
}
} else {
metrics.IncrCounter([]string{"raft", "transition", "heartbeat_timeout"}, 1)
if hasVote(r.configurations.latest, r.localID) {
if hasVote(r.configurations.latest, r.localID) && r.GetAllowCandidating() {
r.logger.Warn("heartbeat timeout reached, starting election", "last-leader-addr", lastLeaderAddr, "last-leader-id", lastLeaderID)
r.setState(Candidate)
return
} else if !didWarn {
r.logger.Warn("heartbeat timeout reached, not part of a stable configuration or a non-voter, not triggering a leader election")
r.logger.Warn("heartbeat timeout reached, not part of a stable configuration or a non-voter or not allowed to candidate, not triggering a leader election")
didWarn = true
}
}
Expand Down Expand Up @@ -2208,10 +2208,14 @@ func (r *Raft) initiateLeadershipTransfer(id *ServerID, address *ServerAddress)

// timeoutNow is what happens when a server receives a TimeoutNowRequest.
func (r *Raft) timeoutNow(rpc RPC, req *TimeoutNowRequest) {
r.setLeader("", "")
r.setState(Candidate)
r.candidateFromLeadershipTransfer.Store(true)
rpc.Respond(&TimeoutNowResponse{}, nil)
if r.GetAllowCandidating() {
r.setLeader("", "")
r.setState(Candidate)
r.candidateFromLeadershipTransfer.Store(true)
rpc.Respond(&TimeoutNowResponse{}, nil)
} else {
rpc.Respond(nil, fmt.Errorf("not allowed to candidate"))
}
}

// setLatestConfiguration stores the latest configuration and updates a copy of it.
Expand Down

0 comments on commit ac6c84d

Please sign in to comment.