-
Notifications
You must be signed in to change notification settings - Fork 5
/
raft.go
204 lines (178 loc) · 6.28 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package autopilot
//
// The methods in this file are all mainly to provide synchronous methods
// for Raft operations that would normally return futures.
//
import (
"fmt"
"strconv"
"github.com/hashicorp/raft"
)
func requiredQuorum(voters int) int {
return (voters / 2) + 1
}
// NumVoters is a helper for calculating the number of voting peers in the
// current raft configuration. This function ignores any autopilot state
// and will make the calculation based on a newly retrieved Raft configuration.
func (a *Autopilot) NumVoters() (int, error) {
cfg, err := a.getRaftConfiguration()
if err != nil {
return 0, err
}
var numVoters int
for _, server := range cfg.Servers {
if server.Suffrage == raft.Voter {
numVoters++
}
}
return numVoters, nil
}
// AddServer is a helper for adding a new server to the raft configuration.
// This may remove servers with duplicate addresses or ids first and after
// its all done will trigger autopilot to remove dead servers if there
// are any. Servers added by this method will start in a non-voting
// state and later on autopilot will promote them to voting status
// if desired by the configured promoter. If too many removals would
// be required that would cause leadership loss then an error is returned
// instead of performing any Raft configuration changes.
func (a *Autopilot) AddServer(s *Server) error {
cfg, err := a.getRaftConfiguration()
if err != nil {
a.logger.Error("failed to get raft configuration", "error", err)
return err
}
var existingVoter bool
var voterRemovals []raft.ServerID
var nonVoterRemovals []raft.ServerID
var numVoters int
for _, server := range cfg.Servers {
if server.Suffrage == raft.Voter {
numVoters++
}
if server.Address == s.Address && server.ID == s.ID {
// nothing to be done as the addr and ID both already match
return nil
} else if server.ID == s.ID {
// special case for address updates only. In this case we should be
// able to update the configuration without have to first remove the server
if server.Suffrage == raft.Voter || server.Suffrage == raft.Staging {
existingVoter = true
}
} else if server.Address == s.Address {
if server.Suffrage == raft.Voter {
voterRemovals = append(voterRemovals, server.ID)
} else {
nonVoterRemovals = append(nonVoterRemovals, server.ID)
}
}
}
requiredVoters := requiredQuorum(numVoters)
if len(voterRemovals) > numVoters-requiredVoters {
return fmt.Errorf("Preventing server addition that would require removal of too many servers and cause cluster instability")
}
for _, id := range voterRemovals {
if err := a.removeServer(id); err != nil {
return fmt.Errorf("error removing server %q with duplicate address %q: %w", id, s.Address, err)
}
a.logger.Info("removed server with duplicate address", "address", s.Address)
}
for _, id := range nonVoterRemovals {
if err := a.removeServer(id); err != nil {
return fmt.Errorf("error removing server %q with duplicate address %q: %w", id, s.Address, err)
}
a.logger.Info("removed server with duplicate address", "address", s.Address)
}
if existingVoter {
if err := a.addVoter(s.ID, s.Address); err != nil {
return err
}
} else {
if err := a.addNonVoter(s.ID, s.Address); err != nil {
return err
}
}
// Trigger a check to remove dead servers
a.RemoveDeadServers()
return nil
}
// RemoveServer is a helper to remove a server from Raft if it
// exists in the latest Raft configuration
func (a *Autopilot) RemoveServer(id raft.ServerID) error {
cfg, err := a.getRaftConfiguration()
if err != nil {
a.logger.Error("failed to get raft configuration", "error", err)
return err
}
// only remove servers currently in the configuration
for _, server := range cfg.Servers {
if server.ID == id {
return a.removeServer(server.ID)
}
}
return nil
}
// addNonVoter is a wrapper around calling the AddNonVoter method on the Raft
// interface object provided to Autopilot
func (a *Autopilot) addNonVoter(id raft.ServerID, addr raft.ServerAddress) error {
addFuture := a.raft.AddNonvoter(id, addr, 0, 0)
if err := addFuture.Error(); err != nil {
a.logger.Error("failed to add raft non-voting peer", "id", id, "address", addr, "error", err)
return err
}
return nil
}
// addVoter is a wrapper around calling the AddVoter method on the Raft
// interface object provided to Autopilot
func (a *Autopilot) addVoter(id raft.ServerID, addr raft.ServerAddress) error {
addFuture := a.raft.AddVoter(id, addr, 0, 0)
if err := addFuture.Error(); err != nil {
a.logger.Error("failed to add raft voting peer", "id", id, "address", addr, "error", err)
return err
}
return nil
}
func (a *Autopilot) demoteVoter(id raft.ServerID) error {
removeFuture := a.raft.DemoteVoter(id, 0, 0)
if err := removeFuture.Error(); err != nil {
a.logger.Error("failed to demote raft peer", "id", id, "error", err)
return err
}
return nil
}
// removeServer is a wrapper around calling the RemoveServer method on the
// Raft interface object provided to Autopilot
func (a *Autopilot) removeServer(id raft.ServerID) error {
a.logger.Debug("removing server by ID", "id", id)
future := a.raft.RemoveServer(id, 0, 0)
if err := future.Error(); err != nil {
a.logger.Error("failed to remove raft server",
"id", id,
"error", err,
)
return err
}
a.logger.Info("removed server", "id", id)
return nil
}
// getRaftConfiguration a wrapper arond calling the GetConfiguration method
// on the Raft interface object provided to Autopilot
func (a *Autopilot) getRaftConfiguration() (*raft.Configuration, error) {
configFuture := a.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
return nil, err
}
cfg := configFuture.Configuration()
return &cfg, nil
}
// lastTerm will retrieve the raft stats and then pull the last term value out of it
func (a *Autopilot) lastTerm() (uint64, error) {
return strconv.ParseUint(a.raft.Stats()["last_log_term"], 10, 64)
}
// leadershipTransfer will transfer leadership to the server with the specified id and address
func (a *Autopilot) leadershipTransfer(id raft.ServerID, address raft.ServerAddress) error {
a.logger.Info("Transferring leadership to new server", "id", id, "address", address)
future := a.raft.LeadershipTransferToServer(id, address)
return future.Error()
}