Skip to content

Commit

Permalink
refactor namming influx by ouput or sender
Browse files Browse the repository at this point in the history
  • Loading branch information
Toni Moreno Gimenez committed Mar 8, 2021
1 parent 2bc6ccf commit ad241dc
Showing 1 changed file with 24 additions and 24 deletions.
48 changes: 24 additions & 24 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ var (
mutex sync.RWMutex
// devices is the runtime snmp devices map
devices map[string]*device.SnmpDevice
// influxdb is the runtime devices output db map
influxdb map[string]*output.InfluxDB
// outputdb is the runtime devices output db map
outputdb map[string]*output.InfluxDB

selfmonProc *selfmon.SelfMon
// gatherWg synchronizes device specific goroutines
Expand Down Expand Up @@ -107,23 +107,23 @@ func CheckAndUnSetReloadProcess() bool {
return retval
}

// PrepareInfluxDBs initializes all configured output DBs in the SQL database.
// PrepareOutputDBs initializes all configured output DBs in the SQL database.
// If there is no "default" key, creates a dummy output db which does nothing.
func PrepareInfluxDBs() map[string]*output.InfluxDB {
idb := make(map[string]*output.InfluxDB)
func PrepareOutputDBs() map[string]*output.InfluxDB {
db := make(map[string]*output.InfluxDB)

var defFound bool
for k, c := range DBConfig.Influxdb {
if k == "default" {
defFound = true
}
idb[k] = output.NewNotInitInfluxDB(c)
db[k] = output.NewNotInitInfluxDB(c)
}
if defFound == false {
log.Warn("No Output default found influxdb devices found !!")
idb["default"] = output.DummyDB
log.Warn("No Output default found outputdb devices found !!")
db["default"] = output.DummyDB
}
return idb
return db
}

// GetDevice returns the snmp device with the given id.
Expand Down Expand Up @@ -154,7 +154,7 @@ func GetOutput(id string) (*output.InfluxDB, error) {
}
mutex.RLock()
defer mutex.RUnlock()
if out, ok = influxdb[id]; !ok {
if out, ok = outputdb[id]; !ok {
return nil, fmt.Errorf("There is not any device with id %s running", id)
}
return out, nil
Expand Down Expand Up @@ -188,7 +188,7 @@ func GetOutputJSONInfo(id string) ([]byte, error) {
}
mutex.RLock()
defer mutex.RUnlock()
if out, ok = influxdb[id]; !ok {
if out, ok = outputdb[id]; !ok {
return nil, fmt.Errorf("there is not any device with id %s running", id)
}
return out.ToJSON()
Expand All @@ -198,7 +198,7 @@ func GetOutputJSONInfo(id string) ([]byte, error) {
func GetOutputStats() map[string]*output.InfluxStats {
outstats := make(map[string]*output.InfluxStats)
mutex.RLock()
for k, v := range influxdb {
for k, v := range outputdb {
outstats[k] = v.GetBasicStats()
}
mutex.RUnlock()
Expand All @@ -216,19 +216,19 @@ func GetDeviceStats() map[string]*device.DevStat {
return devstats
}

// StopInfluxOut stops sending data to output influxDB servers.
func StopInfluxOut(idb map[string]*output.InfluxDB) {
// StopOutSenders stops sending data to output influxDB servers.
func StopOutSenders(idb map[string]*output.InfluxDB) {
for k, v := range idb {
log.Infof("Stopping Influxdb out %s", k)
log.Infof("Stopping Sender process %s", k)
v.StopSender()
v.LeaveBus(OutBus)
}
}

// ReleaseInfluxOut closes the influxDB connections and releases the associated resources.
func ReleaseInfluxOut(idb map[string]*output.InfluxDB) {
// ReleaseOutResources closes all Output connections and releases the associated resources.
func ReleaseOutResources(idb map[string]*output.InfluxDB) {
for k, v := range idb {
log.Infof("Release Influxdb resources %s", k)
log.Infof("Release Sender resources %s", k)
v.End()
}
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func initSelfMonitoring(idb map[string]*output.InfluxDB) {

if MainConfig.Selfmon.Enabled {
if val, ok := idb["default"]; ok {
//only executed if a "default" influxdb exist
//only executed if a "default" outputdb exist
val.Init(OutBus)
val.StartSender(&senderWg)

Expand Down Expand Up @@ -325,7 +325,7 @@ func AddDeviceInRuntime(k string, cfg *config.SnmpDeviceCfg) {
dev.SetSelfMonitoring(selfmonProc)

// send a db map to initialize each one its own db if needed
outdb, _ := dev.GetOutSenderFromMap(influxdb)
outdb, _ := dev.GetOutSenderFromMap(outputdb)
outdb.Init(OutBus)
outdb.StartSender(&senderWg)

Expand All @@ -338,10 +338,10 @@ func AddDeviceInRuntime(k string, cfg *config.SnmpDeviceCfg) {
// LoadConf loads the DB conf and initializes the device metric config.
func LoadConf() {
MainConfig.Database.LoadDbConfig(&DBConfig)
influxdb = PrepareInfluxDBs()
outputdb = PrepareOutputDBs()

// begin self monitoring process if needed, before all goroutines
initSelfMonitoring(influxdb)
initSelfMonitoring(outputdb)
config.InitMetricsCfg(&DBConfig)
}

Expand Down Expand Up @@ -372,11 +372,11 @@ func End() (time.Duration, error) {
//log.Info("DEBUG Gather WAIT %+v", GatherWg)
//log.Info("DEBUG SENDER WAIT %+v", senderWg)
// stop all Output Emitter
StopInfluxOut(influxdb)
StopOutSenders(outputdb)
log.Info("END: waiting for all Sender goroutines stop..")
senderWg.Wait()
log.Info("END: releasing Sender Resources")
ReleaseInfluxOut(influxdb)
ReleaseOutResources(outputdb)
log.Infof("END: Finished from %s to %s [Duration : %s]", start.String(), time.Now().String(), time.Since(start).String())
return time.Since(start), nil
}
Expand Down

0 comments on commit ad241dc

Please sign in to comment.