Skip to content

Commit

Permalink
Fixed review points
Browse files Browse the repository at this point in the history
Signed-off-by: Kakuya Ando <[email protected]>
  • Loading branch information
servak committed Aug 17, 2023
1 parent fb8ee82 commit 543dfb9
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 37 deletions.
71 changes: 49 additions & 22 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ var (
wrapCounters = kingpin.Flag("snmp.wrap-large-counters", "Wrap 64-bit counters to avoid floating point rounding.").Default("true").Bool()
srcAddress = kingpin.Flag("snmp.source-address", "Source address to send snmp from in the format 'address:port' to use when connecting targets. If the port parameter is empty or '0', as in '127.0.0.1:' or '[::1]:0', a source port number is automatically (random) chosen.").Default("").String()

SnmpDuration = promauto.NewHistogramVec(
snmpDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "snmp_collection_duration_seconds",
Help: "Duration of collections by the SNMP exporter",
Expand Down Expand Up @@ -89,6 +89,10 @@ func listToOid(l []int) string {
return strings.Join(result, ".")
}

func InitModuleMetrics(auth, module string) {
snmpDuration.WithLabelValues(auth, module)
}

type ScrapeResults struct {
pdus []gosnmp.SnmpPDU
packets uint64
Expand Down Expand Up @@ -364,12 +368,24 @@ type internalMetrics struct {
snmpRetries prometheus.Counter
}

type NamedModule struct {
*config.Module
name string
}

func NewNamedModule(name string, module *config.Module) *NamedModule {
return &NamedModule{
Module: module,
name: name,
}
}

type Collector struct {
ctx context.Context
target string
auth *config.Auth
authName string
modules map[string]*config.Module
modules []*NamedModule
logger log.Logger
metrics internalMetrics
concurrency int
Expand Down Expand Up @@ -414,7 +430,7 @@ func newInternalMetrics(reg prometheus.Registerer) internalMetrics {
}
}

func New(ctx context.Context, target, authName string, auth *config.Auth, modules map[string]*config.Module, logger log.Logger, reg prometheus.Registerer, conc int) *Collector {
func New(ctx context.Context, target, authName string, auth *config.Auth, modules []*NamedModule, logger log.Logger, reg prometheus.Registerer, conc int) *Collector {
internalMetrics := newInternalMetrics(reg)
return &Collector{ctx: ctx, target: target, authName: authName, auth: auth, modules: modules, logger: logger, metrics: internalMetrics, concurrency: conc}
}
Expand All @@ -424,11 +440,11 @@ func (c Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- prometheus.NewDesc("dummy", "dummy", nil, nil)
}

func (c Collector) collect(ch chan<- prometheus.Metric, moduleName string, module *config.Module) {
logger := log.With(c.logger, "module", moduleName)
func (c Collector) collect(ch chan<- prometheus.Metric, module *NamedModule) {
logger := log.With(c.logger, "module", module.name)
start := time.Now()
results, err := ScrapeTarget(c.ctx, c.target, c.auth, module, logger, c.metrics)
moduleLabel := prometheus.Labels{"module": moduleName}
results, err := ScrapeTarget(c.ctx, c.target, c.auth, module.Module, logger, c.metrics)
moduleLabel := prometheus.Labels{"module": module.name}
if err != nil {
level.Info(logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, moduleLabel), err)
Expand Down Expand Up @@ -485,23 +501,34 @@ PduLoop:

// Collect implements Prometheus.Collector.
func (c Collector) Collect(ch chan<- prometheus.Metric) {
var wg sync.WaitGroup
sem := make(chan struct{}, c.concurrency)
for name, module := range c.modules {
sem <- struct{}{}
wg := sync.WaitGroup{}
workerCount := c.concurrency
if workerCount < 1 {
workerCount = 1
}
workerChan := make(chan *NamedModule)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func(name string, module *config.Module) {
logger := log.With(c.logger, "module", name)
level.Debug(logger).Log("msg", "Starting scrape")
start := time.Now()
c.collect(ch, name, module)
duration := time.Since(start).Seconds()
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
SnmpDuration.WithLabelValues(c.authName, name).Observe(duration)
<-sem
wg.Done()
}(name, module)
go func() {
defer wg.Done()
for m := range workerChan {
logger := log.With(c.logger, "module", m.name)
level.Debug(logger).Log("msg", "Starting scrape")
start := time.Now()
c.collect(ch, m)
duration := time.Since(start).Seconds()
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
snmpDuration.WithLabelValues(c.authName, m.name).Observe(duration)
}
}()
}

go func() {
for _, module := range c.modules {
workerChan <- module
}
close(workerChan)
}()
wg.Wait()
}

Expand Down
39 changes: 24 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ func handler(w http.ResponseWriter, r *http.Request, logger log.Logger) {
if len(queryModule) == 0 {
queryModule = append(queryModule, "if_mib")
}
uniqueM := make(map[string]bool)
var modules []string
for _, qm := range queryModule {
for _, m := range strings.Split(qm, ",") {
if m == "" {
continue
}
if _, ok := uniqueM[m]; !ok {
uniqueM[m] = true
modules = append(modules, m)
}
}
}
sc.RLock()
auth, authOk := sc.C.Auths[authName]
if !authOk {
Expand All @@ -100,25 +113,21 @@ func handler(w http.ResponseWriter, r *http.Request, logger log.Logger) {
snmpRequestErrors.Inc()
return
}
modules := make(map[string]*config.Module)
for _, qm := range queryModule {
for _, m := range strings.Split(qm, ",") {
module, moduleOk := sc.C.Modules[m]
if !moduleOk {
sc.RUnlock()
http.Error(w, fmt.Sprintf("Unknown module '%s'", m), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
}
if _, ok := modules[m]; !ok {
modules[m] = module
}
var nmodules []*collector.NamedModule
for _, m := range modules {
module, moduleOk := sc.C.Modules[m]
if !moduleOk {
sc.RUnlock()
http.Error(w, fmt.Sprintf("Unknown module '%s'", m), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
}
nmodules = append(nmodules, collector.NewNamedModule(m, module))
}
sc.RUnlock()
logger = log.With(logger, "auth", authName, "target", target)
registry := prometheus.NewRegistry()
c := collector.New(r.Context(), target, authName, auth, modules, logger, registry, *concurrency)
c := collector.New(r.Context(), target, authName, auth, nmodules, logger, registry, *concurrency)
registry.MustRegister(c)
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
Expand Down Expand Up @@ -153,7 +162,7 @@ func (sc *SafeConfig) ReloadConfig(configFile string) (err error) {
// Initialize metrics.
for auth := range sc.C.Auths {
for module := range sc.C.Modules {
collector.SnmpDuration.WithLabelValues(auth, module)
collector.InitModuleMetrics(auth, module)
}
}
sc.Unlock()
Expand Down

0 comments on commit 543dfb9

Please sign in to comment.