Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support fetching multiple modules in one scrape #945

Merged
merged 8 commits into from
Aug 23, 2023
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,25 @@ Note that [URL encoding](https://en.wikipedia.org/wiki/URL_encoding) should be u
to the `:` and `/` characters. Prometheus encodes query parameters automatically and manual encoding
is not necessary within the Prometheus configuration file.

## Multi-Module Handling
The multi-module functionality allows you to specify multiple modules, enabling the retrieval of information from several modules in a single scrape.
The concurrency can be specified using the snmp-exporter option `--snmp.module-concurrency` (the default is 1).

Note: This implementation does not perform any de-duplication of walks between different modules.

There are two ways to specify multiple modules. You can either separate them with a comma or define multiple params_module.
The URLs would look like this:

For comma separation:
```
http://localhost:9116/snmp?module=if_mib,arista_sw&target=192.0.0.8
```

For multiple params_module:
```
http://localhost:9116/snmp?module=if_mib&module=arista_sw&target=192.0.0.8
```

## Configuration

The default configuration file name is `snmp.yml` and should not be edited
Expand Down
103 changes: 83 additions & 20 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"

"github.com/alecthomas/kingpin/v2"
Expand All @@ -42,6 +43,14 @@ var (
float64Mantissa uint64 = 9007199254740992
wrapCounters = kingpin.Flag("snmp.wrap-large-counters", "Wrap 64-bit counters to avoid floating point rounding.").Default("true").Bool()
srcAddress = kingpin.Flag("snmp.source-address", "Source address to send snmp from in the format 'address:port' to use when connecting targets. If the port parameter is empty or '0', as in '127.0.0.1:' or '[::1]:0', a source port number is automatically (random) chosen.").Default("").String()

snmpDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "snmp_collection_duration_seconds",
Help: "Duration of collections by the SNMP exporter",
},
[]string{"auth", "module"},
)
)

// Types preceded by an enum with their actual type.
Expand Down Expand Up @@ -80,6 +89,14 @@ func listToOid(l []int) string {
return strings.Join(result, ".")
}

func InitModuleMetrics(auths map[string]*config.Auth, modules map[string]*config.Module) {
for auth := range auths {
for module := range modules {
snmpDuration.WithLabelValues(auth, module)
}
}
}

type ScrapeResults struct {
pdus []gosnmp.SnmpPDU
packets uint64
Expand Down Expand Up @@ -355,13 +372,27 @@ type internalMetrics struct {
snmpRetries prometheus.Counter
}

type NamedModule struct {
*config.Module
name string
}
SuperQ marked this conversation as resolved.
Show resolved Hide resolved

func NewNamedModule(name string, module *config.Module) *NamedModule {
return &NamedModule{
Module: module,
name: name,
}
}

type Collector struct {
ctx context.Context
target string
auth *config.Auth
module *config.Module
logger log.Logger
metrics internalMetrics
ctx context.Context
target string
auth *config.Auth
authName string
modules []*NamedModule
logger log.Logger
metrics internalMetrics
concurrency int
}

func newInternalMetrics(reg prometheus.Registerer) internalMetrics {
Expand Down Expand Up @@ -403,47 +434,48 @@ func newInternalMetrics(reg prometheus.Registerer) internalMetrics {
}
}

func New(ctx context.Context, target string, auth *config.Auth, module *config.Module, logger log.Logger, reg prometheus.Registerer) *Collector {
func New(ctx context.Context, target, authName string, auth *config.Auth, modules []*NamedModule, logger log.Logger, reg prometheus.Registerer, conc int) *Collector {
internalMetrics := newInternalMetrics(reg)
return &Collector{ctx: ctx, target: target, auth: auth, module: module, logger: logger, metrics: internalMetrics}
return &Collector{ctx: ctx, target: target, authName: authName, auth: auth, modules: modules, logger: logger, metrics: internalMetrics, concurrency: conc}
}

// Describe implements Prometheus.Collector.
func (c Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- prometheus.NewDesc("dummy", "dummy", nil, nil)
}

// Collect implements Prometheus.Collector.
func (c Collector) Collect(ch chan<- prometheus.Metric) {
func (c Collector) collect(ch chan<- prometheus.Metric, module *NamedModule) {
logger := log.With(c.logger, "module", module.name)
start := time.Now()
results, err := ScrapeTarget(c.ctx, c.target, c.auth, c.module, c.logger, c.metrics)
results, err := ScrapeTarget(c.ctx, c.target, c.auth, module.Module, logger, c.metrics)
moduleLabel := prometheus.Labels{"module": module.name}
if err != nil {
level.Info(c.logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, nil), err)
level.Info(logger).Log("msg", "Error scraping target", "err", err)
ch <- prometheus.NewInvalidMetric(prometheus.NewDesc("snmp_error", "Error scraping target", nil, moduleLabel), err)
return
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_walk_duration_seconds", "Time SNMP walk/bulkwalk took.", nil, nil),
prometheus.NewDesc("snmp_scrape_walk_duration_seconds", "Time SNMP walk/bulkwalk took.", nil, moduleLabel),
prometheus.GaugeValue,
time.Since(start).Seconds())
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_sent", "Packets sent for get, bulkget, and walk; including retries.", nil, nil),
prometheus.NewDesc("snmp_scrape_packets_sent", "Packets sent for get, bulkget, and walk; including retries.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.packets))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_packets_retried", "Packets retried for get, bulkget, and walk.", nil, nil),
prometheus.NewDesc("snmp_scrape_packets_retried", "Packets retried for get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(results.retries))
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_pdus_returned", "PDUs returned from get, bulkget, and walk.", nil, nil),
prometheus.NewDesc("snmp_scrape_pdus_returned", "PDUs returned from get, bulkget, and walk.", nil, moduleLabel),
prometheus.GaugeValue,
float64(len(results.pdus)))
oidToPdu := make(map[string]gosnmp.SnmpPDU, len(results.pdus))
for _, pdu := range results.pdus {
oidToPdu[pdu.Name[1:]] = pdu
}

metricTree := buildMetricTree(c.module.Metrics)
metricTree := buildMetricTree(module.Metrics)
// Look for metrics that match each pdu.
PduLoop:
for oid, pdu := range oidToPdu {
Expand All @@ -457,7 +489,7 @@ PduLoop:
}
if head.metric != nil {
// Found a match.
samples := pduToSamples(oidList[i+1:], &pdu, head.metric, oidToPdu, c.logger, c.metrics)
samples := pduToSamples(oidList[i+1:], &pdu, head.metric, oidToPdu, logger, c.metrics)
for _, sample := range samples {
ch <- sample
}
Expand All @@ -466,11 +498,42 @@ PduLoop:
}
}
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc("snmp_scrape_duration_seconds", "Total SNMP time scrape took (walk and processing).", nil, nil),
prometheus.NewDesc("snmp_scrape_duration_seconds", "Total SNMP time scrape took (walk and processing).", nil, moduleLabel),
prometheus.GaugeValue,
time.Since(start).Seconds())
}

// Collect implements Prometheus.Collector.
func (c Collector) Collect(ch chan<- prometheus.Metric) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume there is a unit test for this somewhere?

wg := sync.WaitGroup{}
workerCount := c.concurrency
if workerCount < 1 {
workerCount = 1
}
workerChan := make(chan *NamedModule)
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for m := range workerChan {
logger := log.With(c.logger, "module", m.name)
level.Debug(logger).Log("msg", "Starting scrape")
start := time.Now()
c.collect(ch, m)
duration := time.Since(start).Seconds()
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
snmpDuration.WithLabelValues(c.authName, m.name).Observe(duration)
}
}()
}

for _, module := range c.modules {
workerChan <- module
}
close(workerChan)
wg.Wait()
}

func getPduValue(pdu *gosnmp.SnmpPDU) float64 {
switch pdu.Type {
case gosnmp.Counter64:
Expand Down
73 changes: 36 additions & 37 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
_ "net/http/pprof"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
Expand All @@ -43,20 +43,14 @@ import (
var (
configFile = kingpin.Flag("config.file", "Path to configuration file.").Default("snmp.yml").String()
dryRun = kingpin.Flag("dry-run", "Only verify configuration is valid and exit.").Default("false").Bool()
concurrency = kingpin.Flag("snmp.module-concurrency", "The number of modules to fetch concurrently per scrape").Default("1").Int()
metricsPath = kingpin.Flag(
"web.telemetry-path",
"Path under which to expose metrics.",
).Default("/metrics").String()
toolkitFlags = webflag.AddFlags(kingpin.CommandLine, ":9116")

// Metrics about the SNMP exporter itself.
snmpDuration = promauto.NewSummaryVec(
prometheus.SummaryOpts{
Name: "snmp_collection_duration_seconds",
Help: "Duration of collections by the SNMP exporter",
},
[]string{"auth", "module"},
)
snmpRequestErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "snmp_request_errors_total",
Expand Down Expand Up @@ -94,44 +88,50 @@ func handler(w http.ResponseWriter, r *http.Request, logger log.Logger) {
authName = "public_v2"
}

moduleName := query.Get("module")
if len(query["module"]) > 1 {
http.Error(w, "'module' parameter must only be specified once", http.StatusBadRequest)
snmpRequestErrors.Inc()
return
queryModule := query["module"]
if len(queryModule) == 0 {
queryModule = append(queryModule, "if_mib")
}
if moduleName == "" {
moduleName = "if_mib"
uniqueM := make(map[string]bool)
var modules []string
for _, qm := range queryModule {
for _, m := range strings.Split(qm, ",") {
if m == "" {
continue
}
if _, ok := uniqueM[m]; !ok {
uniqueM[m] = true
modules = append(modules, m)
}
}
}

sc.RLock()
auth, authOk := sc.C.Auths[authName]
module, moduleOk := sc.C.Modules[moduleName]
sc.RUnlock()
if !authOk {
sc.RUnlock()
http.Error(w, fmt.Sprintf("Unknown auth '%s'", authName), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
}
if !moduleOk {
http.Error(w, fmt.Sprintf("Unknown module '%s'", moduleName), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
var nmodules []*collector.NamedModule
for _, m := range modules {
module, moduleOk := sc.C.Modules[m]
if !moduleOk {
sc.RUnlock()
http.Error(w, fmt.Sprintf("Unknown module '%s'", m), http.StatusBadRequest)
snmpRequestErrors.Inc()
return
}
nmodules = append(nmodules, collector.NewNamedModule(m, module))
}

logger = log.With(logger, "auth", authName, "module", moduleName, "target", target)
level.Debug(logger).Log("msg", "Starting scrape")

start := time.Now()
sc.RUnlock()
logger = log.With(logger, "auth", authName, "target", target)
registry := prometheus.NewRegistry()
c := collector.New(r.Context(), target, auth, module, logger, registry)
c := collector.New(r.Context(), target, authName, auth, nmodules, logger, registry, *concurrency)
registry.MustRegister(c)
// Delegate http serving to Prometheus client library, which will call collector.Collect.
h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{})
h.ServeHTTP(w, r)
duration := time.Since(start).Seconds()
snmpDuration.WithLabelValues(authName, moduleName).Observe(duration)
level.Debug(logger).Log("msg", "Finished scrape", "duration_seconds", duration)
}

func updateConfiguration(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -160,11 +160,7 @@ func (sc *SafeConfig) ReloadConfig(configFile string) (err error) {
sc.Lock()
sc.C = conf
// Initialize metrics.
for auth := range sc.C.Auths {
for module := range sc.C.Modules {
snmpDuration.WithLabelValues(auth, module)
}
}
collector.InitModuleMetrics(sc.C.Auths, sc.C.Modules)
sc.Unlock()
return nil
}
Expand All @@ -176,8 +172,11 @@ func main() {
kingpin.HelpFlag.Short('h')
kingpin.Parse()
logger := promlog.New(promlogConfig)
if *concurrency < 1 {
*concurrency = 1
}

level.Info(logger).Log("msg", "Starting snmp_exporter", "version", version.Info())
level.Info(logger).Log("msg", "Starting snmp_exporter", "version", version.Info(), "concurrency", concurrency)
SuperQ marked this conversation as resolved.
Show resolved Hide resolved
level.Info(logger).Log("build_context", version.BuildContext())

prometheus.MustRegister(version.NewCollector("snmp_exporter"))
Expand Down