Skip to content

Commit

Permalink
Merge pull request #273 from nats-io/refresh-metrics-on-collect
Browse files Browse the repository at this point in the history
Fix missing metrics for server stats not available when exporter is first started
  • Loading branch information
Jarema authored Jan 15, 2024
2 parents f6cb9e6 + 8888a3a commit e330976
Show file tree
Hide file tree
Showing 11 changed files with 282 additions and 147 deletions.
45 changes: 40 additions & 5 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package collector
import (
"encoding/json"
"io"
"maps"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -51,11 +52,12 @@ type metric struct {
// NATSCollector collects NATS metrics
type NATSCollector struct {
sync.Mutex
Stats map[string]metric
httpClient *http.Client
endpoint string
system string
servers []*CollectedServer
Stats map[string]metric
httpClient *http.Client
endpoint string
system string
servers []*CollectedServer
serverRespKeys map[string]struct{}
}

// newPrometheusGaugeVec creates a custom GaugeVec
Expand Down Expand Up @@ -213,6 +215,14 @@ func (nc *NATSCollector) makeRequests() map[string]map[string]interface{} {
Debugf("ignoring server %s: %v", u.ID, err)
delete(resps, u.ID)
}

// verify if there are any new keys in the response that we haven't seen before
keys := mapKeys(response, "")
if !maps.Equal(keys, nc.serverRespKeys) {
Debugf("new keys found in the response from %s, updating metrics", u.URL)
nc.objectToMetrics(response, nc.system)
nc.serverRespKeys = keys
}
resps[u.ID] = response
}
return resps
Expand Down Expand Up @@ -307,6 +317,7 @@ func (nc *NATSCollector) initMetricsFromServers(namespace string) {
}
}

nc.serverRespKeys = mapKeys(response, "")
nc.objectToMetrics(response, namespace)
}

Expand Down Expand Up @@ -395,6 +406,30 @@ func (nc *NATSCollector) objectToMetrics(response map[string]interface{}, namesp
}
}

// mapKeys returns a map of all keys in a map, including nested maps.
// The keys from nested maps are prefixed with the parent key.
func mapKeys(input map[string]any, prefix string) map[string]struct{} {
keys := make(map[string]struct{})

for k, v := range input {
fullKey := k
if prefix != "" {
fullKey = prefix + "_" + k
}

if nestedMap, ok := v.(map[string]any); ok {
nestedKeys := mapKeys(nestedMap, fullKey)
for nestedKey := range nestedKeys {
keys[nestedKey] = struct{}{}
}
} else {
keys[fullKey] = struct{}{}
}
}

return keys
}

func newNatsCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector {
// TODO: Potentially add TLS config in the transport.
tr := &http.Transport{}
Expand Down
28 changes: 28 additions & 0 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collector

import (
"fmt"
"maps"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -728,3 +729,30 @@ func TestReplicatorMetrics(t *testing.T) {
url := "http://127.0.0.1:9922"
verifyCollector(ReplicatorSystem, url, "varz", cases, t)
}

func TestMapKeys(t *testing.T) {
m := map[string]any{
"foo": "bar",
"baz": "quux",
"nested": map[string]any{
"foo": "bar",
"baz": "quux",
"nested": map[string]any{
"foo": "bar",
"baz": "quux",
},
},
}
expected := map[string]struct{}{
"foo": {},
"baz": {},
"nested_foo": {},
"nested_baz": {},
"nested_nested_foo": {},
"nested_nested_baz": {},
}
keys := mapKeys(m, "")
if !maps.Equal(keys, expected) {
t.Fatalf("expected %v, got %v", expected, keys)
}
}
53 changes: 43 additions & 10 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2018 The NATS Authors
// Copyright 2017-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -15,9 +15,11 @@
package exporter

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"net"
"net/http"
Expand All @@ -29,6 +31,7 @@ import (

"github.com/nats-io/prometheus-nats-exporter/collector"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/crypto/bcrypt"
)
Expand Down Expand Up @@ -73,9 +76,11 @@ type NATSExporterOptions struct {
// NATSExporter collects NATS metrics
type NATSExporter struct {
sync.Mutex
registry *prometheus.Registry
opts *NATSExporterOptions
doneWg sync.WaitGroup
http net.Listener
http *http.Server
addr string
Collectors []prometheus.Collector
servers []*collector.CollectedServer
mode uint8
Expand Down Expand Up @@ -131,7 +136,7 @@ func (ne *NATSExporter) createCollector(system, endpoint string) {
}

func (ne *NATSExporter) registerCollector(system, endpoint string, nc prometheus.Collector) {
if err := prometheus.Register(nc); err != nil {
if err := ne.registry.Register(nc); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); ok {
collector.Errorf("A collector for this server's metrics has already been registered.")
} else {
Expand Down Expand Up @@ -234,7 +239,7 @@ func (ne *NATSExporter) InitializeCollectors() error {
func (ne *NATSExporter) ClearCollectors() {
if ne.Collectors != nil {
for _, c := range ne.Collectors {
prometheus.Unregister(c)
ne.registry.Unregister(c)
}
ne.Collectors = nil
}
Expand All @@ -247,6 +252,27 @@ func (ne *NATSExporter) Start() error {
if ne.mode == modeStarted {
return nil
}
// Since we are adding metrics in runtime, we need to use a custom registry
// instead of the default one. This is because the default registry is
// global and collectors cannot be properly re-registered after being
// modified.
if ne.registry == nil {
ne.registry = prometheus.NewRegistry()
}
if err := ne.registry.Register(collectors.NewGoCollector()); err != nil {
if errors.As(err, &prometheus.AlreadyRegisteredError{}) {
collector.Debugf("GoCollector already registered")
} else {
return fmt.Errorf("error registering GoCollector: %v", err)
}
}
if err := ne.registry.Register(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})); err != nil {
if errors.As(err, &prometheus.AlreadyRegisteredError{}) {
collector.Debugf("ProcessCollector already registered")
} else {
return fmt.Errorf("error registering GoCollector: %v", err)
}
}

if err := ne.InitializeCollectors(); err != nil {
ne.ClearCollectors()
Expand Down Expand Up @@ -323,7 +349,9 @@ func (ne *NATSExporter) isValidUserPass(user, password string) bool {
// auhtorization has been specificed. Otherwise, it checks
// basic authorization.
func (ne *NATSExporter) getScrapeHandler() http.Handler {
h := promhttp.Handler()
h := promhttp.InstrumentMetricHandler(
ne.registry, promhttp.HandlerFor(ne.registry, promhttp.HandlerOpts{}),
)

if ne.opts.HTTPUser != "" {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -369,6 +397,7 @@ func (ne *NATSExporter) startHTTP() error {
path = "/" + path
}

var listener net.Listener
// If a certificate file has been specified, setup TLS with the
// key provided.
if ne.opts.CertFile != "" {
Expand All @@ -378,11 +407,11 @@ func (ne *NATSExporter) startHTTP() error {
if err != nil {
return err
}
ne.http, err = tls.Listen("tcp", hp, config)
listener, err = tls.Listen("tcp", hp, config)
} else {
proto = "http"
collector.Debugf("No certificate file specified; using http.")
ne.http, err = net.Listen("tcp", hp)
listener, err = net.Listen("tcp", hp)
}

collector.Noticef("Prometheus exporter listening at %s://%s%s", proto, hp, path)
Expand All @@ -401,12 +430,13 @@ func (ne *NATSExporter) startHTTP() error {
MaxHeaderBytes: 1 << 20,
TLSConfig: config,
}
ne.http = srv
ne.addr = listener.Addr().String()

sHTTP := ne.http
go func() {
for i := 0; i < 10; i++ {
var err error
if err = srv.Serve(sHTTP); err != nil {
if err = srv.Serve(listener); err != nil {
// In a test environment, this can fail because the server is
// already running.

Expand Down Expand Up @@ -451,10 +481,13 @@ func (ne *NATSExporter) Stop() {
return
}

if err := ne.http.Close(); err != nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := ne.http.Shutdown(ctx); err != nil {
collector.Debugf("Did not close HTTP: %v", err)
}
ne.ClearCollectors()
ne.registry = nil
ne.doneWg.Done()
ne.mode = modeStopped
}
Loading

0 comments on commit e330976

Please sign in to comment.