Skip to content

Commit

Permalink
Add getting server id from /varz for the server
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <[email protected]>
  • Loading branch information
wallyqs committed Jul 12, 2019
1 parent 43acc39 commit e3c1626
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 7 deletions.
52 changes: 52 additions & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -80,6 +81,57 @@ func getMetricURL(httpClient *http.Client, URL string, response interface{}) err
return json.Unmarshal(body, &response)
}

// GetServerIDFromVarz gets the server ID from the server.
func GetServerIDFromVarz(endpoint string, retryInterval time.Duration) string {
getServerID := func() (string, error) {
resp, err := http.DefaultClient.Get(endpoint + "/varz")
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
var response map[string]interface{}
err = json.Unmarshal(body, &response)
if err != nil {
return "", err
}
serverID, ok := response["server_id"]
if !ok {
Fatalf("Could not find server id in /varz")
}
id, ok := serverID.(string)
if !ok {
Fatalf("Invalid server_id type in /varz: %+v", serverID)
}

return id, nil
}

var id string
var err error
id, err = getServerID()
if err == nil {
return id
}

// Retry periodically until available, in case it never starts
// then a liveness check against the NATS Server itself should
// detect that an restart the server, in terms of the exporter
// we just wait for it to eventually be available.
for range time.NewTicker(retryInterval).C {
id, err = getServerID()
if err != nil {
Errorf("Could not find server id: %s", err)
continue
}
break
}
return id
}

// Describe the metric to the Prometheus server.
func (nc *NATSCollector) Describe(ch chan<- *prometheus.Desc) {
nc.Lock()
Expand Down
11 changes: 11 additions & 0 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ func getLabelValues(url string, endpoint string, metricNames []string) (map[stri
}
}

func TestServerIDFromVarz(t *testing.T) {
s := pet.RunServer()
defer s.Shutdown()

url := fmt.Sprintf("http://localhost:%d/", pet.MonitorPort)
result := GetServerIDFromVarz(url, 2*time.Second)
if len(result) < 1 || result[0] != 'N' {
t.Fatalf("Unexpected server id: %v", result)
}
}

func TestVarz(t *testing.T) {
s := pet.RunServer()
defer s.Shutdown()
Expand Down
1 change: 1 addition & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type NATSExporterOptions struct {
HTTPUser string // User in metrics scrape by prometheus.
HTTPPassword string
Prefix string
UseOldServerID bool
}

//NATSExporter collects NATS metrics
Expand Down
26 changes: 19 additions & 7 deletions prometheus_nats_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/nats-io/prometheus-nats-exporter/exporter"
)

var version = "0.4.0"
var version = "0.5.0"

// parseServerIDAndURL parses the url argument the optional id for the server ID.
func parseServerIDAndURL(urlArg string) (string, string, error) {
Expand Down Expand Up @@ -118,6 +118,7 @@ func main() {
flag.StringVar(&opts.HTTPUser, "http_user", "", "Enable basic auth and set user name for HTTP scrapes.")
flag.StringVar(&opts.HTTPPassword, "http_pass", "", "Set the password for HTTP scrapes. NATS bcrypt supported.")
flag.StringVar(&opts.Prefix, "prefix", "", "Set the prefix for all the metrics.")
flag.BoolVar(&opts.UseOldServerID, "use_old_server_id", false, "Disables using ServerID from /varz")
flag.Parse()

opts.RetryInterval = time.Duration(retryInterval) * time.Second
Expand Down Expand Up @@ -146,15 +147,26 @@ necessary.`)
// Create an instance of the NATS exporter.
exp := exporter.NewExporter(opts)

// For each URL specified, add the NATS server with the optional ID.
for _, arg := range flag.Args() {
id, url, err := parseServerIDAndURL(arg)
if err != nil {
collector.Fatalf("Unable to parse URL %q: %v", arg, err)
}
if len(args) == 1 && !opts.UseOldServerID {
// Pick the server id from the /varz endpoint info.
url := flag.Args()[0]
id := collector.GetServerIDFromVarz(url, opts.RetryInterval)
if err := exp.AddServer(id, url); err != nil {
collector.Fatalf("Unable to setup server in exporter: %s, %s: %v", id, url, err)
}
} else {
// For each URL specified, add the NATS server with the optional ID.
for _, arg := range args {
// This should make the http request to get the server id
// that is returned from /varz.
id, url, err := parseServerIDAndURL(arg)
if err != nil {
collector.Fatalf("Unable to parse URL %q: %v", arg, err)
}
if err := exp.AddServer(id, url); err != nil {
collector.Fatalf("Unable to setup server in exporter: %s, %s: %v", id, url, err)
}
}
}

// Start the exporter.
Expand Down

0 comments on commit e3c1626

Please sign in to comment.