-
Notifications
You must be signed in to change notification settings - Fork 81
/
Copy pathprocess_watcher.go
148 lines (124 loc) · 4.49 KB
/
process_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package plugins
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/nginx/agent/sdk/v2/proto"
"github.com/nginx/agent/v2/src/core"
)
// ProcessWatcher listens for changes to nginx processes on the data plane
type ProcessWatcher struct {
messagePipeline core.MessagePipeInterface
ticker *time.Ticker
seenMasterProcs map[int32]core.Process
seenWorkerProcs map[int32]core.Process
nginxDetails map[int32]*proto.NginxDetails
wg sync.WaitGroup
env core.Environment
binary core.NginxBinary
}
func NewProcessWatcher(env core.Environment, nginxBinary core.NginxBinary) *ProcessWatcher {
return &ProcessWatcher{
ticker: time.NewTicker(time.Millisecond * 300),
seenMasterProcs: make(map[int32]core.Process),
seenWorkerProcs: make(map[int32]core.Process),
nginxDetails: make(map[int32]*proto.NginxDetails),
wg: sync.WaitGroup{},
env: env,
binary: nginxBinary,
}
}
func (pw *ProcessWatcher) Init(pipeline core.MessagePipeInterface) {
log.Info("ProcessWatcher initializing")
pw.messagePipeline = pipeline
nginxProcesses := pw.env.Processes()
for _, proc := range nginxProcesses {
if proc.IsMaster {
pw.seenMasterProcs[proc.Pid] = proc
} else {
pw.seenWorkerProcs[proc.Pid] = proc
}
pw.nginxDetails[proc.Pid] = pw.binary.GetNginxDetailsFromProcess(proc)
}
pw.wg.Add(1)
go pw.watchProcLoop(pipeline.Context())
pw.messagePipeline.Process(core.NewMessage(core.NginxDetailProcUpdate, nginxProcesses))
}
func (pw *ProcessWatcher) Info() *core.Info {
return core.NewInfo("ProcessWatcher", "v0.0.1")
}
func (pw *ProcessWatcher) Close() {
log.Info("ProcessWatcher is wrapping up")
pw.wg.Done()
}
func (pw *ProcessWatcher) Process(message *core.Message) {}
func (pw *ProcessWatcher) Subscriptions() []string {
return []string{}
}
func (pw *ProcessWatcher) watchProcLoop(ctx context.Context) {
defer pw.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-pw.ticker.C:
nginxProcs := pw.env.Processes()
procUpdates, runningMasterProcs, runningWorkerProcs := pw.getProcUpdates(nginxProcs)
if len(procUpdates) > 0 {
pw.messagePipeline.Process(procUpdates...)
pw.seenMasterProcs = runningMasterProcs
pw.seenWorkerProcs = runningWorkerProcs
pw.messagePipeline.Process(core.NewMessage(core.NginxDetailProcUpdate, nginxProcs))
}
}
}
}
// getProcUpdates returns a slice of updates to process, currently running master proc map, currently running worker process map
func (pw *ProcessWatcher) getProcUpdates(nginxProcs []core.Process) ([]*core.Message, map[int32]core.Process, map[int32]core.Process) {
procUpdates := []*core.Message{}
// create maps of currently running processes
runningMasterProcs := make(map[int32]core.Process)
runningWorkerProcs := make(map[int32]core.Process)
for _, proc := range nginxProcs {
if proc.IsMaster {
runningMasterProcs[proc.Pid] = proc
} else {
runningWorkerProcs[proc.Pid] = proc
}
}
// send messages for new processes that were created
for _, proc := range nginxProcs {
if _, ok := pw.seenMasterProcs[proc.Pid]; ok {
continue
}
if _, ok := pw.seenWorkerProcs[proc.Pid]; ok {
continue
}
pw.nginxDetails[proc.Pid] = pw.binary.GetNginxDetailsFromProcess(proc)
if proc.IsMaster {
log.Debugf("Processing process change event: new master proc %d", proc.Pid)
procUpdates = append(procUpdates, core.NewMessage(core.NginxMasterProcCreated, pw.nginxDetails[proc.Pid]))
} else {
log.Debugf("Processing process change event: new worker proc %d", proc.Pid)
procUpdates = append(procUpdates, core.NewMessage(core.NginxWorkerProcCreated, pw.nginxDetails[proc.Pid]))
}
}
// send messages for old master processes that have been killed
for pid, proc := range pw.seenMasterProcs {
if _, ok := runningMasterProcs[pid]; !ok {
log.Debugf("Processing process change event: old master proc killed %d", pid)
procUpdates = append(procUpdates, core.NewMessage(core.NginxMasterProcKilled, pw.nginxDetails[proc.Pid]))
delete(pw.nginxDetails, proc.Pid)
}
}
// send messages for old worker processes that have been killed
for pid, proc := range pw.seenWorkerProcs {
if _, ok := runningWorkerProcs[pid]; !ok {
log.Debugf("Processing process change event: old worker proc killed %d", pid)
procUpdates = append(procUpdates, core.NewMessage(core.NginxWorkerProcKilled, pw.nginxDetails[proc.Pid]))
delete(pw.nginxDetails, proc.Pid)
}
}
return procUpdates, runningMasterProcs, runningWorkerProcs
}