-
Notifications
You must be signed in to change notification settings - Fork 52
/
agent.go
362 lines (318 loc) · 10.4 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
package agent
import (
"fmt"
"sync"
"time"
"github.com/toni-moreno/snmpcollector/pkg/agent/bus"
"github.com/toni-moreno/snmpcollector/pkg/agent/device"
"github.com/toni-moreno/snmpcollector/pkg/agent/output"
"github.com/toni-moreno/snmpcollector/pkg/agent/selfmon"
"github.com/toni-moreno/snmpcollector/pkg/config"
"github.com/toni-moreno/snmpcollector/pkg/data/stats"
"github.com/toni-moreno/snmpcollector/pkg/data/utils"
)
var (
// Version is the app X.Y.Z version
Version string
// Commit is the git commit sha1
Commit string
// Branch is the git branch
Branch string
// BuildStamp is the build timestamp
BuildStamp string
)
// swagger:model RInfo
// RInfo contains the agent release and version information.
type RInfo struct {
// InstanceID the unique name identificator for this agent
InstanceID string
// Version is the app X.Y.Z version
Version string
// Commit is the git commit sha1
Commit string
// Branch is the git branch
Branch string
// BuildStamp is the build timestamp
BuildStamp string
}
// GetRInfo returns the agent release information.
func GetRInfo() *RInfo {
info := &RInfo{
InstanceID: MainConfig.General.InstanceID,
Version: Version,
Commit: Commit,
Branch: Branch,
BuildStamp: BuildStamp,
}
return info
}
var (
// Bus is the messaging system used to send messages to the devices
Bus = bus.NewBus()
// MainConfig contains the global configuration
MainConfig config.Config
// DBConfig contains the database config
DBConfig config.DBConfig
log utils.Logger
// reloadMutex guards the reloadProcess flag
reloadMutex sync.Mutex
reloadProcess bool
// mutex guards the runtime devices map access
mutex sync.RWMutex
// devices is the runtime snmp devices map
devices map[string]*device.SnmpDevice
// influxdb is the runtime devices output db map
influxdb map[string]*output.InfluxDB
selfmonProc *selfmon.SelfMon
// gatherWg synchronizes device specific goroutines
gatherWg sync.WaitGroup
senderWg sync.WaitGroup
)
// SetLogger sets the current log output.
func SetLogger(l utils.Logger) {
log = l
}
// Reload Mutex Related Methods.
// CheckReloadProcess checks if the agent is currently reloading config.
func CheckReloadProcess() bool {
reloadMutex.Lock()
defer reloadMutex.Unlock()
return reloadProcess
}
// CheckAndSetReloadProcess sets the reloadProcess flag.
// Returns its previous value.
func CheckAndSetReloadProcess() bool {
reloadMutex.Lock()
defer reloadMutex.Unlock()
retval := reloadProcess
reloadProcess = true
return retval
}
// CheckAndUnSetReloadProcess unsets the reloadProcess flag.
// Returns its previous value.
func CheckAndUnSetReloadProcess() bool {
reloadMutex.Lock()
defer reloadMutex.Unlock()
retval := reloadProcess
reloadProcess = false
return retval
}
// PrepareInfluxDBs initializes all configured output DBs in the SQL database.
// If there is no "default" key, creates a dummy output db which does nothing.
func PrepareInfluxDBs() map[string]*output.InfluxDB {
idb := make(map[string]*output.InfluxDB)
var defFound bool
for k, c := range DBConfig.Influxdb {
if k == "default" {
defFound = true
}
idb[k] = output.NewNotInitInfluxDB(c)
}
if defFound == false {
log.Warn("No Output default found influxdb devices found !!")
idb["default"] = output.DummyDB
}
return idb
}
// GetDevice returns the snmp device with the given id.
// Returns an error if there is an ongoing reload.
func GetDevice(id string) (*device.SnmpDevice, error) {
var dev *device.SnmpDevice
var ok bool
if CheckReloadProcess() == true {
log.Warning("There is a reload process running while trying to get device info")
return nil, fmt.Errorf("There is a reload process running.... please wait until finished ")
}
mutex.RLock()
defer mutex.RUnlock()
if dev, ok = devices[id]; !ok {
return nil, fmt.Errorf("There is not any device with id %s running", id)
}
return dev, nil
}
// GetDeviceJSONInfo returns the device data in JSON format.
// Returns an error if there is an ongoing reload.
func GetDeviceJSONInfo(id string) ([]byte, error) {
var dev *device.SnmpDevice
var ok bool
if CheckReloadProcess() == true {
log.Warning("There is a reload process running while trying to get device info")
return nil, fmt.Errorf("There is a reload process running.... please wait until finished ")
}
mutex.RLock()
defer mutex.RUnlock()
if dev, ok = devices[id]; !ok {
return nil, fmt.Errorf("there is not any device with id %s running", id)
}
return dev.ToJSON()
}
// GetDevStats returns a map with the basic info of each device.
func GetDevStats() map[string]*stats.GatherStats {
devstats := make(map[string]*stats.GatherStats)
mutex.RLock()
for k, v := range devices {
devstats[k] = v.GetBasicStats()
}
mutex.RUnlock()
return devstats
}
// StopInfluxOut stops sending data to output influxDB servers.
func StopInfluxOut(idb map[string]*output.InfluxDB) {
for k, v := range idb {
log.Infof("Stopping Influxdb out %s", k)
v.StopSender()
}
}
// ReleaseInfluxOut closes the influxDB connections and releases the associated resources.
func ReleaseInfluxOut(idb map[string]*output.InfluxDB) {
for k, v := range idb {
log.Infof("Release Influxdb resources %s", k)
v.End()
}
}
// DeviceProcessStop stops all device polling goroutines
func DeviceProcessStop() {
Bus.Broadcast(&bus.Message{Type: bus.Exit})
}
// DeviceProcessStart starts all device polling goroutines
func DeviceProcessStart() {
mutex.Lock()
devices = make(map[string]*device.SnmpDevice)
mutex.Unlock()
for k, c := range DBConfig.SnmpDevice {
AddDeviceInRuntime(k, c)
}
}
func init() {
go Bus.Start()
}
func initSelfMonitoring(idb map[string]*output.InfluxDB) {
log.Debugf("INFLUXDB2: %+v", idb)
selfmonProc = selfmon.NewNotInit(&MainConfig.Selfmon)
if MainConfig.Selfmon.Enabled {
if val, ok := idb["default"]; ok {
// only executed if a "default" influxdb exist
val.Init()
val.StartSender(&senderWg)
selfmonProc.Init()
selfmonProc.SetOutDB(idb)
selfmonProc.SetOutput(val)
log.Printf("SELFMON enabled %+v", MainConfig.Selfmon)
// Begin the statistic reporting
selfmonProc.StartGather(&gatherWg)
} else {
MainConfig.Selfmon.Enabled = false
log.Errorf("SELFMON disabled becaouse of no default db found !!! SELFMON[ %+v ] INFLUXLIST[ %+v]\n", MainConfig.Selfmon, idb)
}
} else {
log.Printf("SELFMON disabled %+v\n", MainConfig.Selfmon)
}
}
// IsDeviceInRuntime checks if device `id` exists in the runtime array.
func IsDeviceInRuntime(id string) bool {
mutex.Lock()
defer mutex.Unlock()
if _, ok := devices[id]; ok {
return true
}
return false
}
// DeleteDeviceInRuntime removes the device `id` from the runtime array.
func DeleteDeviceInRuntime(id string) error {
// Avoid modifications to devices while deleting device
mutex.Lock()
defer mutex.Unlock()
if dev, ok := devices[id]; ok {
// Stop all device processes and its measurements. Once finished they will be removed
// from the bus and node closed (snmp connections for measurements will be closed)
dev.StopGather()
log.Debugf("Bus retuned from the exit message to the ID device %s", id)
delete(devices, id)
return nil
}
log.Errorf("There is no %s device in the runtime device list", id)
return nil
}
// AddDeviceInRuntime initializes each SNMP device and puts the pointer to the global device map.
func AddDeviceInRuntime(k string, cfg *config.SnmpDeviceCfg) {
// Initialize each SNMP device and put pointer to the global map devices
dev := device.New(cfg)
dev.AttachToBus(Bus)
dev.InitCatalogVar(DBConfig.VarCatalog)
dev.SetSelfMonitoring(selfmonProc)
// send a db map to initialize each one its own db if needed
outdb, _ := dev.GetOutSenderFromMap(influxdb)
outdb.Init()
outdb.StartSender(&senderWg)
mutex.Lock()
devices[k] = dev
// Start gather goroutine for device and add it to the wait group for gather goroutines
gatherWg.Add(1)
go func() {
defer gatherWg.Done()
dev.StartGather()
log.Infof("Device %s finished", cfg.ID)
// If device goroutine has finished, leave the bus so it won't get blocked trying
// to send messages to a not running device.
dev.LeaveBus(Bus)
}()
mutex.Unlock()
}
// LoadConf loads the DB conf and initializes the device metric config.
func LoadConf() {
MainConfig.Database.LoadDbConfig(&DBConfig)
influxdb = PrepareInfluxDBs()
// begin self monitoring process if needed, before all goroutines
initSelfMonitoring(influxdb)
config.InitMetricsCfg(&DBConfig)
}
// Start loads the agent configuration and starts it.
func Start() {
LoadConf()
DeviceProcessStart()
}
// End stops all devices polling.
func End() (time.Duration, error) {
start := time.Now()
log.Infof("END: begin device Gather processes stop... at %s", start.String())
// Stop all device processes and its measurements. Once finished they will be removed
// from the bus and node closed (snmp connections for measurements will be closed)
DeviceProcessStop()
log.Info("END: begin selfmon Gather processes stop...")
// stop the selfmon process
selfmonProc.StopGather()
log.Info("END: waiting for all Gather goroutines stop...")
// wait until Done
gatherWg.Wait()
log.Info("END: releasing Selfmonitoring Resources")
selfmonProc.End()
log.Info("END: begin sender processes stop...")
// log.Info("DEBUG Gather WAIT %+v", GatherWg)
// log.Info("DEBUG SENDER WAIT %+v", senderWg)
// stop all Output Emitter
StopInfluxOut(influxdb)
log.Info("END: waiting for all Sender goroutines stop..")
senderWg.Wait()
log.Info("END: releasing Sender Resources")
ReleaseInfluxOut(influxdb)
log.Infof("END: Finished from %s to %s [Duration : %s]", start.String(), time.Now().String(), time.Since(start).String())
return time.Since(start), nil
}
// ReloadConf stops the polling, reloads all configuration and restart the polling.
func ReloadConf() (time.Duration, error) {
start := time.Now()
if CheckAndSetReloadProcess() == true {
log.Warnf("RELOADCONF: There is another reload process running while trying to reload at %s ", start.String())
return time.Since(start), fmt.Errorf("There is another reload process running.... please wait until finished ")
}
log.Infof("RELOADCONF INIT: begin device Gather processes stop... at %s", start.String())
End()
log.Info("RELOADCONF: loading configuration Again...")
LoadConf()
log.Info("RELOADCONF: Starting all device processes again...")
// Initialize Devices in Runtime map
DeviceProcessStart()
log.Infof("RELOADCONF END: Finished from %s to %s [Duration : %s]", start.String(), time.Now().String(), time.Since(start).String())
CheckAndUnSetReloadProcess()
return time.Since(start), nil
}