Skip to content

Commit

Permalink
Implemented a feature to fetch from multiple modules
Browse files Browse the repository at this point in the history
Signed-off-by: Kakuya Ando <[email protected]>
  • Loading branch information
servak committed Aug 14, 2023
1 parent d4f9da1 commit 76fcc8f
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 54 deletions.
74 changes: 54 additions & 20 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/alecthomas/kingpin/v2"
Expand All @@ -42,6 +43,14 @@ var (
float64Mantissa uint64 = 9007199254740992
wrapCounters = kingpin.Flag("snmp.wrap-large-counters", "Wrap 64-bit counters to avoid floating point rounding.").Default("true").Bool()
srcAddress = kingpin.Flag("snmp.source-address", "Source address to send snmp from in the format 'address:port' to use when connecting targets. If the port parameter is empty or '0', as in '127.0.0.1:' or '[::1]:0', a source port number is automatically (random) chosen.").Default("").String()

SnmpDuration = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "snmp_collection_duration_seconds",
Help: "Duration of collections by the SNMP exporter",
},
[]string{"auth", "module"},
)
)

// Types preceded by an enum with their actual type.
Expand Down Expand Up @@ -356,12 +365,14 @@ type internalMetrics struct {
}

type Collector struct {
ctx context.Context
target string
auth *config.Auth
module *config.Module
logger log.Logger
metrics internalMetrics
ctx context.Context
target string
auth *config.Auth
authName string
modules map[string]*config.Module
logger log.Logger
metrics internalMetrics
concurrency int
}

func newInternalMetrics(reg prometheus.Registerer) internalMetrics {
Expand Down Expand Up @@ -403,47 +414,48 @@ func newInternalMetrics(reg prometheus.Registerer) internalMetrics {
}
}

func New(ctx context.Context, target string, auth *config.Auth, module *config.Module, logger log.Logger, reg prometheus.Registerer) *Collector {
func New(ctx context.Context, target, authName string, auth *config.Auth, modules map[string]*config.Module, logger log.Logger, reg prometheus.Registerer, conc int) *Collector {
internalMetrics := newInternalMetrics(reg)
return &Collector{ctx: ctx, target: target, auth: auth, module: module, logger: logger, metrics: internalMetrics}
return &Collector{ctx: ctx, target: target, authName: authName, auth: auth, modules: modules, logger: logger, metrics: internalMetrics, concurrency: conc}
}

// Describe implements Prometheus.Collector.
func (c Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- prometheus.NewDesc("dummy", "dummy", nil, nil)
}

// Collect implements Prometheus.Collector.
func (c Collector) Collect(ch chan<- prometheus.Metric) {
func (c Collector) collect(ch chan<- prometheus.Metric, moduleName string, module *config.Module) {
logger := log.With(c.logger, "module", moduleName)
start := time.Now()
results, err := ScrapeTarget(c.ctx, c.target, c.auth, c.module, c.logger, c.metrics)
results, err := ScrapeTarget(c.ctx, c.target, c.auth, module, logger, c.metrics)
moduleLabel := prometheus.Labels{"module": moduleName}
if err != nil {
level.Info(c.logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, nil), err)
level.Info(logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, moduleLabel), err)
return
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_walk_duration_seconds", "Time SNMP walk/bulkwalk took.", nil, nil),
prometheus.NewDesc("snmp_scrape_walk_duration_seconds", "Time SNMP walk/bulkwalk took.", nil, moduleLabel),
prometheus.GaugeValue,
time.Since(start).Seconds())
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_sent", "Packets sent for get, bulkget, and walk; including retries.", nil, nil),
prometheus.NewDesc("snmp_scrape_packets_sent", "Packets sent for get, bulkget, and walk; including retries.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.packets))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_retried", "Packets retried for get, bulkget, and walk.", nil, nil),
prometheus.NewDesc("snmp_scrape_packets_retried", "Packets retried for get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.retries))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_pdus_returned", "PDUs returned from get, bulkget, and walk.", nil, nil),
prometheus.NewDesc("snmp_scrape_pdus_returned", "PDUs returned from get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(len(results.pdus)))
oidToPdu := make(map[string]gosnmp.SnmpPDU, len(results.pdus))
for _, pdu := range results.pdus {
oidToPdu[pdu.Name[1:]] = pdu
}

metricTree := buildMetricTree(c.module.Metrics)
metricTree := buildMetricTree(module.Metrics)
// Look for metrics that match each pdu.
PduLoop:
for oid, pdu := range oidToPdu {
Expand All @@ -457,7 +469,7 @@ PduLoop:
}
if head.metric != nil {
// Found a match.
samples := pduToSamples(oidList[i+1:], &pdu, head.metric, oidToPdu, c.logger, c.metrics)
samples := pduToSamples(oidList[i+1:], &pdu, head.metric, oidToPdu, logger, c.metrics)
for _, sample := range samples {
ch <- sample
}
Expand All @@ -466,11 +478,33 @@ PduLoop:
}
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_duration_seconds", "Total SNMP time scrape took (walk and processing).", nil, nil),
prometheus.NewDesc("snmp_scrape_duration_seconds", "Total SNMP time scrape took (walk and processing).", nil, moduleLabel),
prometheus.GaugeValue,
time.Since(start).Seconds())
}

// Collect implements Prometheus.Collector.
func (c Collector) Collect(ch chan<- prometheus.Metric) {
var wg sync.WaitGroup
sem := make(chan struct{}, c.concurrency)
for name, module := range c.modules {
sem <- struct{}{}
wg.Add(1)
go func(name string, module *config.Module) {
logger := log.With(c.logger, "module", name)
level.Debug(logger).Log("msg", "Starting scrape")
start := time.Now()
c.collect(ch, name, module)
duration := time.Since(start).Seconds()
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
SnmpDuration.WithLabelValues(c.authName, name).Observe(duration)
<-sem
wg.Done()
}(name, module)
}
wg.Wait()
}

func getPduValue(pdu *gosnmp.SnmpPDU) float64 {
switch pdu.Type {
case gosnmp.Counter64:
Expand Down
57 changes: 23 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"os/signal"
"sync"
"syscall"
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
Expand All @@ -43,20 +42,14 @@ import (
var (
configFile = kingpin.Flag("config.file", "Path to configuration file.").Default("snmp.yml").String()
dryRun = kingpin.Flag("dry-run", "Only verify configuration is valid and exit.").Default("false").Bool()
concurrency = kingpin.Flag("concurrency", "Specify the number of modules to fetch concurrently").Default("1").Int()
metricsPath = kingpin.Flag(
"web.telemetry-path",
"Path under which to expose metrics.",
).Default("/metrics").String()
toolkitFlags = webflag.AddFlags(kingpin.CommandLine, ":9116")

// Metrics about the SNMP exporter itself.
snmpDuration = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "snmp_collection_duration_seconds",
Help: "Duration of collections by the SNMP exporter",
},
[]string{"auth", "module"},
)
snmpRequestErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "snmp_request_errors_total",
Expand Down Expand Up @@ -94,44 +87,37 @@ func handler(w http.ResponseWriter, r *http.Request, logger log.Logger) {
authName = "public_v2"
}

moduleName := query.Get("module")
if len(query["module"]) > 1 {
http.Error(w, "'module' parameter must only be specified once", http.StatusBadRequest)
snmpRequestErrors.Inc()
return
queryModule := query["module"]
if len(queryModule) == 0 {
queryModule = append(queryModule, "if_mib")
}
if moduleName == "" {
moduleName = "if_mib"
}

sc.RLock()
auth, authOk := sc.C.Auths[authName]
module, moduleOk := sc.C.Modules[moduleName]
sc.RUnlock()
if !authOk {
sc.RUnlock()
http.Error(w, fmt.Sprintf("Unknown auth '%s'", authName), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
}
if !moduleOk {
http.Error(w, fmt.Sprintf("Unknown module '%s'", moduleName), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
modules := make(map[string]*config.Module)
for _, m := range queryModule {
module, moduleOk := sc.C.Modules[m]
if !moduleOk {
sc.RUnlock()
http.Error(w, fmt.Sprintf("Unknown module '%s'", m), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
}
modules[m] = module
}

logger = log.With(logger, "auth", authName, "module", moduleName, "target", target)
level.Debug(logger).Log("msg", "Starting scrape")

start := time.Now()
sc.RUnlock()
logger = log.With(logger, "auth", authName, "target", target)
registry := prometheus.NewRegistry()
c := collector.New(r.Context(), target, auth, module, logger, registry)
c := collector.New(r.Context(), target, authName, auth, modules, logger, registry, *concurrency)
registry.MustRegister(c)
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
duration := time.Since(start).Seconds()
snmpDuration.WithLabelValues(authName, moduleName).Observe(duration)
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
}

func updateConfiguration(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -162,7 +148,7 @@ func (sc *SafeConfig) ReloadConfig(configFile string) (err error) {
// Initialize metrics.
for auth := range sc.C.Auths {
for module := range sc.C.Modules {
snmpDuration.WithLabelValues(auth, module)
collector.SnmpDuration.WithLabelValues(auth, module)
}
}
sc.Unlock()
Expand All @@ -176,8 +162,11 @@ func main() {
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)
if *concurrency < 1 {
*concurrency = 1
}

level.Info(logger).Log("msg", "Starting snmp_exporter", "version", version.Info())
level.Info(logger).Log("msg", "Starting snmp_exporter", "version", version.Info(), "concurrency", concurrency)
level.Info(logger).Log("build_context", version.BuildContext())

prometheus.MustRegister(version.NewCollector("snmp_exporter"))
Expand Down

0 comments on commit 76fcc8f

Please sign in to comment.