-
Notifications
You must be signed in to change notification settings - Fork 5
/
run.go
196 lines (166 loc) · 5.79 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package autopilot
import (
"context"
"time"
)
// Start will launch the go routines in the background to perform Autopilot.
// When the context passed in is cancelled or the Stop method is called
// then these routines will exit.
func (a *Autopilot) Start(ctx context.Context) {
a.execLock.Lock()
defer a.execLock.Unlock()
// already running so there is nothing to do
if a.execution != nil && a.execution.status == Running {
return
}
ctx, shutdown := context.WithCancel(ctx)
exec := &execInfo{
status: Running,
shutdown: shutdown,
done: make(chan struct{}),
}
if a.execution == nil || a.execution.status == NotRunning {
// In theory with a nil execution or the current execution being in the not
// running state, we should be able to immediately gain the leader lock as
// nothing else should be running and holding the lock. While true we still
// gain the lock to ensure that only one thread may even attempt to be
// modifying the autopilot state at once.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := a.leaderLock.TryLock(ctx); err == nil {
a.updateState(ctx)
a.leaderLock.Unlock()
}
}
go a.beginExecution(ctx, exec)
a.execution = exec
return
}
// Stop will terminate the go routines being executed to perform autopilot.
func (a *Autopilot) Stop() <-chan struct{} {
a.execLock.Lock()
defer a.execLock.Unlock()
// Nothing to do
if a.execution == nil || a.execution.status == NotRunning {
done := make(chan struct{})
close(done)
return done
}
a.execution.shutdown()
a.execution.status = ShuttingDown
return a.execution.done
}
// IsRunning returns the current execution status of the autopilot
// go routines as well as a chan which will be closed when the
// routines are no longer running
func (a *Autopilot) IsRunning() (ExecutionStatus, <-chan struct{}) {
a.execLock.Lock()
defer a.execLock.Unlock()
if a.execution == nil || a.execution.status == NotRunning {
done := make(chan struct{})
close(done)
return NotRunning, done
}
return a.execution.status, a.execution.done
}
func (a *Autopilot) finishExecution(exec *execInfo) {
// need to gain the lock because if this was the active execution
// then these values may be read while they are updated.
a.execLock.Lock()
defer a.execLock.Unlock()
exec.shutdown = nil
exec.status = NotRunning
// this should be the final cleanup task as it is what notifies the rest
// of the world that we are now done
close(exec.done)
exec.done = nil
}
func (a *Autopilot) beginExecution(ctx context.Context, exec *execInfo) {
// This will wait for any other go routine to finish executing
// before running any code ourselves to prevent any conflicting
// activity between the two.
if err := a.leaderLock.TryLock(ctx); err != nil {
a.finishExecution(exec)
return
}
a.logger.Debug("autopilot is now running")
// autopilot needs to do 3 things
//
// 1. periodically update the cluster state
// 2. periodically check for and perform promotions and demotions
// 3. Respond to servers leaving and prune dead servers
//
// We could attempt to do all of this in a single go routine except that
// updating the cluster health could potentially take long enough to impact
// the periodicity of the promotions and demotions performed by task 2/3.
// So instead this go routine will spawn a second go routine to manage
// updating the cluster health in the background. This go routine is still
// in control of the overall running status and will not exit until the
// child go routine has exited.
// child go routine for cluster health updating
stateUpdaterDone := make(chan struct{})
go a.runStateUpdater(ctx, stateUpdaterDone)
// cleanup for once we are stopped
defer func() {
// block waiting for our child go routine to also finish
<-stateUpdaterDone
a.logger.Debug("autopilot is now stopped")
// We need to gain this lock so that we can zero out the previous state.
// This prevents us from accidentally tracking stale state in the event
// that we used to be the leader at some point in time, then weren't
// and now are again. In particular this will ensure that that we forget
// about our tracking of the firstStateTime so that once restarted, we
// will ignore server stabilization time just like we do the very
// first time this process ever was the leader.
//
// This isn't included in finishExecution so that we don't perform it
// if we fail to gain the leaderLock before the context gets cancelled
// back at the beginning of this function.
a.stateLock.Lock()
defer a.stateLock.Unlock()
a.state = &State{}
a.finishExecution(exec)
a.leaderLock.Unlock()
}()
reconcileTicker := time.NewTicker(a.reconcileInterval)
defer reconcileTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-reconcileTicker.C:
if err := a.reconcile(); err != nil {
a.logger.Error("Failed to reconcile current state with the desired state",
"error", err)
}
if err := a.pruneDeadServers(); err != nil {
a.logger.Error("Failed to prune dead servers", "error", err)
}
case <-a.removeDeadCh:
if err := a.pruneDeadServers(); err != nil {
a.logger.Error("Failed to prune dead servers", "error", err)
}
}
}
}
// runStateUpdated will periodically update the autopilot state until the context
// passed in is cancelled. When finished the provide done chan will be closed.
func (a *Autopilot) runStateUpdater(ctx context.Context, done chan struct{}) {
a.logger.Debug("state update routine is now running")
defer func() {
a.logger.Debug("state update routine is now stopped")
close(done)
}()
ticker := time.NewTicker(a.updateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
a.updateState(ctx)
}
}
}