-
Notifications
You must be signed in to change notification settings - Fork 14
/
coordinator.go
488 lines (422 loc) · 14 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
package m_etcd
import (
"encoding/json"
"errors"
"os"
"path"
"strings"
"time"
"code.google.com/p/go-uuid/uuid"
"github.com/coreos/go-etcd/etcd"
"github.com/lytics/metafora"
)
const (
// etcd/Response.Action values
actionCreated = "create"
actionSet = "set"
actionExpire = "expire"
actionDelete = "delete"
actionCAD = "compareAndDelete"
)
var (
ClaimTTL uint64 = 120 // seconds
DefaultNodePathTTL uint64 = 20 // seconds
// etcd actions signifying a claim key was released
releaseActions = map[string]bool{
actionExpire: true,
actionDelete: true,
actionCAD: true,
}
// etcd actions signifying a new key
newActions = map[string]bool{
actionCreated: true,
actionSet: true,
}
restartWatchError = errors.New("index too old, need to restart watch")
)
type ownerValue struct {
Node string `json:"node"`
}
type EtcdCoordinator struct {
Client *etcd.Client
cordCtx metafora.CoordinatorContext
namespace string
taskPath string
ClaimTTL uint64 // seconds
NodeID string
nodePath string
nodePathTTL uint64
commandPath string
taskManager *taskManager
// Close() closes stop channel to signal to watchers to exit
stop chan bool
}
func (ec *EtcdCoordinator) closed() bool {
select {
case <-ec.stop:
return true
default:
return false
}
}
// NewEtcdCoordinator creates a new Metafora Coordinator implementation using
// etcd as the broker. If no node ID is specified, a unique one will be
// generated.
//
// Coordinator methods will be called by the core Metafora Consumer. Calling
// Init, Close, etc. from your own code will lead to undefined behavior.
func NewEtcdCoordinator(nodeID, namespace string, client *etcd.Client) metafora.Coordinator {
// Namespace should be an absolute path with no trailing slash
namespace = "/" + strings.Trim(namespace, "/ ")
if nodeID == "" {
hn, _ := os.Hostname()
nodeID = hn + "-" + uuid.NewRandom().String()
}
nodeID = strings.Trim(nodeID, "/ ")
client.SetTransport(transport)
client.SetConsistency(etcd.STRONG_CONSISTENCY)
return &EtcdCoordinator{
Client: client,
namespace: namespace,
taskPath: path.Join(namespace, TasksPath),
ClaimTTL: ClaimTTL, //default to the package constant, but allow it to be overwritten
NodeID: nodeID,
nodePath: path.Join(namespace, NodesPath, nodeID),
nodePathTTL: DefaultNodePathTTL,
commandPath: path.Join(namespace, NodesPath, nodeID, CommandsPath),
stop: make(chan bool),
}
}
// Init is called once by the consumer to provide a Logger to Coordinator
// implementations.
func (ec *EtcdCoordinator) Init(cordCtx metafora.CoordinatorContext) error {
cordCtx.Log(metafora.LogLevelDebug, "Initializing coordinator with namespace: %s and etcd cluster: %s",
ec.namespace, strings.Join(ec.Client.GetCluster(), ", "))
ec.cordCtx = cordCtx
ec.upsertDir(ec.namespace, ForeverTTL)
ec.upsertDir(ec.taskPath, ForeverTTL)
if _, err := ec.Client.CreateDir(ec.nodePath, ec.nodePathTTL); err != nil {
return err
}
go ec.nodeRefresher()
ec.upsertDir(ec.commandPath, ForeverTTL)
ec.taskManager = newManager(cordCtx, ec.Client, ec.taskPath, ec.NodeID, ec.ClaimTTL)
return nil
}
func (ec *EtcdCoordinator) upsertDir(path string, ttl uint64) {
//hidden etcd key that isn't visible to ls commands on the directory,
// you have to know about it to find it :). I'm using it to add some
// info about when the cluster's schema was setup.
pathMarker := path + "/" + MetadataKey
const sorted = false
const recursive = false
_, err := ec.Client.Get(path, sorted, recursive)
if err == nil {
return
}
etcdErr, ok := err.(*etcd.EtcdError)
if ok && etcdErr.ErrorCode == EcodeKeyNotFound {
_, err := ec.Client.CreateDir(path, ttl)
if err != nil {
ec.cordCtx.Log(metafora.LogLevelDebug, "Error trying to create directory. path:[%s] error:[ %v ]", path, err)
}
host, _ := os.Hostname()
metadata := struct {
Host string `json:"host"`
CreatedTime string `json:"created"`
ownerValue
}{
Host: host,
CreatedTime: time.Now().String(),
ownerValue: ownerValue{Node: ec.NodeID},
}
metadataB, _ := json.Marshal(metadata)
metadataStr := string(metadataB)
ec.Client.Create(pathMarker, metadataStr, ttl)
}
}
// nodeRefresher is in chage of keeping the node entry in etcd alive. If it's
// unable to communicate with etcd it must shutdown the coordinator.
//
// watch retries on errors and taskmgr calls Lost(task) on tasks it can't
// refresh, so it's up to nodeRefresher to cause the coordinator to close if
// it's unable to communicate with etcd.
func (ec *EtcdCoordinator) nodeRefresher() {
ttl := ec.nodePathTTL >> 1 // have some leeway before ttl expires
if ttl < 1 {
ttl = 1
}
for {
// Deadline for refreshes to finish by or the coordinator closes.
deadline := time.Now().Add(time.Duration(ec.nodePathTTL) * time.Second)
select {
case <-ec.stop:
return
case <-time.After(time.Duration(ttl) * time.Second):
if err := ec.refreshBy(deadline); err != nil {
// We're in a bad state; shut everything down
ec.cordCtx.Log(metafora.LogLevelError,
"Unable to refresh node key before deadline %s. Last error: %v", deadline, err)
ec.Close()
}
}
}
}
// refreshBy retries refreshing the node key until the deadline is reached.
func (ec *EtcdCoordinator) refreshBy(deadline time.Time) (err error) {
for time.Now().Before(deadline) {
// Make sure we shouldn't exit
select {
case <-ec.stop:
return err
default:
}
_, err = ec.Client.UpdateDir(ec.nodePath, ec.nodePathTTL)
if err == nil {
// It worked!
return nil
}
ec.cordCtx.Log(metafora.LogLevelWarn, "Unexpected error updating node key: %v", err)
transport.CloseIdleConnections() // paranoia; let's get fresh connections on errors.
time.Sleep(500 * time.Millisecond) // rate limit retries a bit
}
// Didn't get a successful response before deadline, exit with error
return err
}
// Watch will do a blocking etcd watch on taskPath until a claimable task is
// found or Close() is called.
//
// Watch will return ("", nil) if the coordinator is closed.
func (ec *EtcdCoordinator) Watch() (taskID string, err error) {
if ec.closed() {
// already closed, don't restart watch
return "", nil
}
const sorted = true
const recursive = true
startWatch:
for {
// Get existing tasks
resp, err := ec.Client.Get(ec.taskPath, sorted, recursive)
if err != nil {
ec.cordCtx.Log(metafora.LogLevelError, "%s Error getting the existing tasks: %v", ec.taskPath, err)
return "", err
}
// Start watching at the index the Get retrieved since we've retrieved all
// tasks up to that point.
index := resp.EtcdIndex
// Act like existing keys are newly created
for _, node := range resp.Node.Nodes {
if node.ModifiedIndex > index {
// Record the max modified index to keep Watch from picking up redundant events
index = node.ModifiedIndex
}
if task, ok := ec.parseTask(&etcd.Response{Action: "create", Node: node}); ok {
return task, nil
}
}
// Start blocking watch
for {
resp, err := ec.watch(ec.taskPath, index)
if err != nil {
if err == restartWatchError {
continue startWatch
}
if err == etcd.ErrWatchStoppedByUser {
return "", nil
}
}
// Found a claimable task! Return it.
if task, ok := ec.parseTask(resp); ok {
return task, nil
}
// Task wasn't claimable, start next watch from where the last watch ended
index = resp.EtcdIndex
}
}
}
func (ec *EtcdCoordinator) parseTask(resp *etcd.Response) (task string, ok bool) {
// Sanity check / test path invariant
if !strings.HasPrefix(resp.Node.Key, ec.taskPath) {
ec.cordCtx.Log(metafora.LogLevelError, "Received task from outside task path: %s", resp.Node.Key)
return "", false
}
key := strings.Trim(resp.Node.Key, "/") // strip leading and trailing /s
parts := strings.Split(key, "/")
// Pickup new tasks
if newActions[resp.Action] && len(parts) == 3 && resp.Node.Dir {
// Make sure it's not already claimed before returning it
for _, n := range resp.Node.Nodes {
if strings.HasSuffix(n.Key, OwnerMarker) {
ec.cordCtx.Log(metafora.LogLevelDebug, "Ignoring task as it's already claimed: %s", parts[2])
return "", false
}
}
ec.cordCtx.Log(metafora.LogLevelDebug, "Received new task: %s", parts[2])
return parts[2], true
}
// If a claim key is removed, try to claim the task
if releaseActions[resp.Action] && len(parts) == 4 && parts[3] == OwnerMarker {
ec.cordCtx.Log(metafora.LogLevelDebug, "Received released task: %s", parts[2])
return parts[2], true
}
// Ignore any other key events (_metafora keys, task deletion, etc.)
return "", false
}
// Claim is called by the Consumer when a Balancer has determined that a task
// ID can be claimed. Claim returns false if another consumer has already
// claimed the ID.
func (ec *EtcdCoordinator) Claim(taskID string) bool {
return ec.taskManager.add(taskID)
}
// Release deletes the claim file.
func (ec *EtcdCoordinator) Release(taskID string) {
const done = false
ec.taskManager.remove(taskID, done)
}
// Done deletes the task.
func (ec *EtcdCoordinator) Done(taskID string) {
const done = true
ec.taskManager.remove(taskID, done)
}
// Command blocks until a command for this node is received from the broker
// by the coordinator.
func (ec *EtcdCoordinator) Command() (metafora.Command, error) {
if ec.closed() {
// already closed, don't restart watch
return nil, nil
}
const sorted = true
const recursive = true
startWatch:
for {
// Get existing commands
resp, err := ec.Client.Get(ec.commandPath, sorted, recursive)
if err != nil {
ec.cordCtx.Log(metafora.LogLevelError, "%s Error getting the existing commands: %v", ec.commandPath, err)
return nil, err
}
// Start watching at the index the Get retrieved since we've retrieved all
// tasks up to that point.
index := resp.EtcdIndex
// Act like existing keys are newly created
for _, node := range resp.Node.Nodes {
if node.ModifiedIndex > index {
// Record the max modified index to keep Watch from picking up redundant events
index = node.ModifiedIndex
}
if cmd := ec.parseCommand(&etcd.Response{Action: "create", Node: node}); cmd != nil {
return cmd, nil
}
}
for {
resp, err := ec.watch(ec.commandPath, index)
if err != nil {
if err == restartWatchError {
continue startWatch
}
if err == etcd.ErrWatchStoppedByUser {
return nil, nil
}
}
if cmd := ec.parseCommand(resp); cmd != nil {
return cmd, nil
}
index = resp.EtcdIndex
}
}
}
func (ec *EtcdCoordinator) parseCommand(resp *etcd.Response) metafora.Command {
if strings.HasSuffix(resp.Node.Key, MetadataKey) {
// Skip metadata marker
return nil
}
const recurse = false
if _, err := ec.Client.Delete(resp.Node.Key, recurse); err != nil {
ec.cordCtx.Log(metafora.LogLevelError, "Error deleting handled command %s: %v", resp.Node.Key, err)
}
cmd, err := metafora.UnmarshalCommand([]byte(resp.Node.Value))
if err != nil {
ec.cordCtx.Log(metafora.LogLevelError, "Invalid command %s: %v", resp.Node.Key, err)
return nil
}
return cmd
}
// Close stops the coordinator and causes blocking Watch and Command methods to
// return zero values.
func (ec *EtcdCoordinator) Close() {
// Gracefully handle multiple close calls mostly to ease testing. This block
// isn't threadsafe, so you shouldn't try to call Close() concurrently.
select {
case <-ec.stop:
return
default:
}
close(ec.stop)
ec.taskManager.stop()
// Finally remove the node entry
const recursive = true
_, err := ec.Client.Delete(ec.nodePath, recursive)
if err != nil {
if eerr, ok := err.(*etcd.EtcdError); ok {
if eerr.ErrorCode == EcodeKeyNotFound {
// The node's TTL was up before we were able to delete it or there was
// another problem that's already being handled.
// The first is unlikely, the latter is already being handled, so
// there's nothing to do here.
return
}
}
// All other errors are unexpected
ec.cordCtx.Log(metafora.LogLevelError, "Error deleting node path %s: %v", ec.nodePath, err)
}
}
// watch will return either an etcd Response or an error. Two errors returned
// by this method should be treated specially:
//
// 1. etcd.ErrWatchStoppedByUser - the coordinator has closed, exit
// accordingly
//
// 2. restartWatchError - the specified index is too old, try again with a
// newer index
func (ec *EtcdCoordinator) watch(path string, index uint64) (*etcd.Response, error) {
const recursive = true
for {
// Start the blocking watch after the last response's index.
rawResp, err := ec.Client.RawWatch(path, index+1, recursive, nil, ec.stop)
if err != nil {
if err == etcd.ErrWatchStoppedByUser {
// This isn't actually an error, the stop chan was closed. Time to stop!
return nil, err
}
// Other RawWatch errors should be retried forever. If the node refresher
// also fails to communicate with etcd it will close the coordinator,
// closing ec.stop in the process which will cause this function to with
// ErrWatchStoppedByUser.
ec.cordCtx.Log(metafora.LogLevelError, "%s Retrying after unexpected watch error: %v", path, err)
transport.CloseIdleConnections() // paranoia; let's get fresh connections on errors.
continue
}
if len(rawResp.Body) == 0 {
// This is a bug in Go's HTTP + go-etcd + etcd which causes the
// connection to timeout perdiocally and need to be restarted *after*
// closing idle connections.
transport.CloseIdleConnections()
continue
}
resp, err := rawResp.Unmarshal()
if err != nil {
if ee, ok := err.(*etcd.EtcdError); ok {
if ee.ErrorCode == EcodeExpiredIndex {
ec.cordCtx.Log(metafora.LogLevelDebug, "%s Too many events have happened since index was updated. Restarting watch.", ec.taskPath)
// We need to retrieve all existing tasks to update our index
// without potentially missing some events.
return nil, restartWatchError
}
}
ec.cordCtx.Log(metafora.LogLevelError, "%s Unexpected error unmarshalling etcd response: %+v", ec.taskPath, err)
return nil, err
}
return resp, nil
}
}