forked from vmihailenco/taskq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathregistry.go
88 lines (72 loc) · 1.63 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package taskq
import (
"fmt"
"sync"
"time"
)
var Tasks TaskMap
type TaskMap struct {
m sync.Map
}
func (r *TaskMap) Get(name string) *Task {
if v, ok := r.m.Load(name); ok {
return v.(*Task)
}
if v, ok := r.m.Load("*"); ok {
return v.(*Task)
}
return nil
}
func (r *TaskMap) Register(opt *TaskOptions) (*Task, error) {
opt.init()
task := &Task{
opt: opt,
handler: NewHandler(opt.Handler),
}
if opt.FallbackHandler != nil {
task.fallbackHandler = NewHandler(opt.FallbackHandler)
}
name := task.Name()
_, loaded := r.m.LoadOrStore(name, task)
if loaded {
return nil, fmt.Errorf("task=%q already exists", name)
}
return task, nil
}
func (r *TaskMap) Unregister(task *Task) {
r.m.Delete(task.Name())
}
func (r *TaskMap) Reset() {
r.m = sync.Map{}
}
func (r *TaskMap) Range(fn func(name string, task *Task) bool) {
r.m.Range(func(key, value interface{}) bool {
return fn(key.(string), value.(*Task))
})
}
func (r *TaskMap) HandleMessage(msg *Message) error {
task := r.Get(msg.TaskName)
if task == nil {
msg.Delay = r.delay(msg, nil, unknownTaskOpt)
return fmt.Errorf("taskq: unknown task=%q", msg.TaskName)
}
opt := task.Options()
if opt.DeferFunc != nil {
defer opt.DeferFunc()
}
msgErr := task.HandleMessage(msg)
if msgErr == nil {
return nil
}
msg.Delay = r.delay(msg, msgErr, opt)
return msgErr
}
func (r *TaskMap) delay(msg *Message, msgErr error, opt *TaskOptions) time.Duration {
if msg.ReservedCount >= opt.RetryLimit {
return 0
}
if delayer, ok := msgErr.(Delayer); ok {
return delayer.Delay()
}
return exponentialBackoff(opt.MinBackoff, opt.MaxBackoff, msg.ReservedCount)
}