Skip to content

Commit

Permalink
Merge pull request #211 from mcosta74/feat/use-server-name
Browse files Browse the repository at this point in the history
Feat/use server name
  • Loading branch information
wallyqs authored Mar 6, 2023
2 parents a9d9d46 + adad172 commit 311823a
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 26 deletions.
41 changes: 25 additions & 16 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,20 @@ func getMetricURL(httpClient *http.Client, url string, response interface{}) err

// GetServerIDFromVarz gets the server ID from the server.
func GetServerIDFromVarz(endpoint string, retryInterval time.Duration) string {
getServerID := func() (string, error) {
return getServerKeyFromVarz(endpoint, retryInterval, "server_id")
}

// GetServerNameFromVarz gets the server name from the server.
func GetServerNameFromVarz(endpoint string, retryInterval time.Duration) string {
return getServerKeyFromVarz(endpoint, retryInterval, "server_name")
}

func getServerKeyFromVarz(endpoint string, retryInterval time.Duration, key string) string {
// Retry periodically until available, in case it never starts
// then a liveness check against the NATS Server itself should
// detect that an restart the server, in terms of the exporter
// we just wait for it to eventually be available.
getServerVarzValue := func() (string, error) {
resp, err := http.DefaultClient.Get(endpoint + "/varz")
if err != nil {
return "", err
Expand All @@ -137,38 +150,34 @@ func GetServerIDFromVarz(endpoint string, retryInterval time.Duration) string {
if err != nil {
return "", err
}
serverID, ok := response["server_id"]
serverVarzValue, ok := response[key]
if !ok {
Fatalf("Could not find server id in /varz")
Fatalf("Could not find %s in /varz", key)
}
id, ok := serverID.(string)
varzValue, ok := serverVarzValue.(string)
if !ok {
Fatalf("Invalid server_id type in /varz: %+v", serverID)
Fatalf("Invalid %s type in /varz: %+v", key, serverVarzValue)
}

return id, nil
return varzValue, nil
}

var id string
var varzValue string
var err error
id, err = getServerID()
varzValue, err = getServerVarzValue()
if err == nil {
return id
return varzValue
}

// Retry periodically until available, in case it never starts
// then a liveness check against the NATS Server itself should
// detect that an restart the server, in terms of the exporter
// we just wait for it to eventually be available.
for range time.NewTicker(retryInterval).C {
id, err = getServerID()
varzValue, err = getServerVarzValue()
if err != nil {
Errorf("Could not find server id: %s", err)
Errorf("Could not find %s: %s", key, err)
continue
}
break
}
return id
return varzValue
}

// Describe the metric to the Prometheus server.
Expand Down
12 changes: 12 additions & 0 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ func TestServerIDFromVarz(t *testing.T) {
}
}

func TestServerNameFromVarz(t *testing.T) {
serverName := "My Awesome Server Name"
s := pet.RunServerWithName(serverName)
defer s.Shutdown()

url := fmt.Sprintf("http://localhost:%d/", pet.MonitorPort)
result := GetServerNameFromVarz(url, 2*time.Second)
if result != serverName {
t.Fatalf("Unexpected server name: %v", result)
}
}

func TestVarz(t *testing.T) {
s := pet.RunServer()
defer s.Shutdown()
Expand Down
1 change: 1 addition & 0 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type NATSExporterOptions struct {
HTTPPassword string
Prefix string
UseInternalServerID bool
UseServerName bool
}

// NATSExporter collects NATS metrics
Expand Down
15 changes: 13 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func main() {
flag.StringVar(&opts.HTTPPassword, "http_pass", "", "Set the password for HTTP scrapes. NATS bcrypt supported.")
flag.StringVar(&opts.Prefix, "prefix", "", "Replace the default prefix for all the metrics.")
flag.BoolVar(&opts.UseInternalServerID, "use_internal_server_id", false, "Enables using ServerID from /varz")
flag.BoolVar(&opts.UseServerName, "use_internal_server_name", false, "Enables using ServerName from /varz")
flag.Parse()

opts.RetryInterval = time.Duration(retryInterval) * time.Second
Expand Down Expand Up @@ -158,14 +159,24 @@ necessary.`)
// Create an instance of the NATS exporter.
exp := exporter.NewExporter(opts)

if len(args) == 1 && opts.UseInternalServerID {
switch {
case len(args) == 1 && opts.UseInternalServerID:
// Pick the server id from the /varz endpoint info.
url := flag.Args()[0]
id := collector.GetServerIDFromVarz(url, opts.RetryInterval)
if err := exp.AddServer(id, url); err != nil {
collector.Fatalf("Unable to setup server in exporter: %s, %s: %v", id, url, err)
}
} else {

case len(args) == 1 && opts.UseServerName:
// Pick the server name from the /varz endpoint info.
url := flag.Args()[0]
id := collector.GetServerNameFromVarz(url, opts.RetryInterval)
if err := exp.AddServer(id, url); err != nil {
collector.Fatalf("Unable to setup server in exporter: %s, %s: %v", id, url, err)
}

default:
// For each URL specified, add the NATS server with the optional ID.
for _, arg := range args {
// This should make the http request to get the server id
Expand Down
27 changes: 19 additions & 8 deletions test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func RunServer() *server.Server {
return RunServerWithPorts(ClientPort, MonitorPort)
}

// RunServerWithName runs the NATS server in a go routine
func RunServerWithName(name string) *server.Server {
return RunServerWithPortsAndName(ClientPort, MonitorPort, name)
}

// RunStreamingServer runs the STAN server in a go routine.
func RunStreamingServer() *nss.StanServer {
return RunStreamingServerWithPorts(nss.DefaultClusterID, ClientPort, MonitorPort)
Expand Down Expand Up @@ -105,8 +110,8 @@ func RunStreamingServerWithPorts(clusterID string, port, monitorPort int) *nss.S
return s
}

// RunServerWithPorts runs the NATS server with a monitor port in a go routine
func RunServerWithPorts(cport, mport int) *server.Server {
// RunServerWithPortsAndName runs the NATS server with a monitor port and a name in a go routine
func RunServerWithPortsAndName(cport, mport int, serverName string) *server.Server {
var enableLogging bool

resetPreviousHTTPConnections()
Expand All @@ -116,12 +121,13 @@ func RunServerWithPorts(cport, mport int) *server.Server {
// enableLogging = true

opts := &server.Options{
Host: "127.0.0.1",
Port: cport,
HTTPHost: "127.0.0.1",
HTTPPort: mport,
NoLog: !enableLogging,
NoSigs: true,
ServerName: serverName,
Host: "127.0.0.1",
Port: cport,
HTTPHost: "127.0.0.1",
HTTPPort: mport,
NoLog: !enableLogging,
NoSigs: true,
}

s := server.New(opts)
Expand Down Expand Up @@ -167,6 +173,11 @@ func RunServerWithPorts(cport, mport int) *server.Server {
panic("Unable to start NATS Server in Go Routine")
}

// RunServerWithPorts runs the NATS server with a monitor port in a go routine
func RunServerWithPorts(cport, mport int) *server.Server {
return RunServerWithPortsAndName(cport, mport, "")
}

var tempRoot = filepath.Join(os.TempDir(), "exporter")

// RunJetStreamServerWithPorts starts a JetStream server.
Expand Down

0 comments on commit 311823a

Please sign in to comment.