From d4dcdd264473ca984e97a7df183b8d8824682369 Mon Sep 17 00:00:00 2001 From: Faisal K Date: Fri, 17 Feb 2023 00:12:48 +0100 Subject: [PATCH] Add option to collect detailed metrics from /connz endpoint Signed-off-by: Faisal K --- README.md | 2 + collector/connz.go | 419 +++++++++++++++++++++++++++++++++++++++---- exporter/exporter.go | 11 +- main.go | 2 + 4 files changed, 396 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 00b2b10b..6d6f4ff6 100644 --- a/README.md +++ b/README.md @@ -55,6 +55,8 @@ prometheus-nats-exporter url Get streaming channel metrics. -connz Get connection metrics. + -connz_detailed + Get detailed connection metrics for each client. Enables flag "-connz" implicitly. -healthz Get health metrics. -gatewayz diff --git a/collector/connz.go b/collector/connz.go index 3a2572c6..c8514996 100644 --- a/collector/connz.go +++ b/collector/connz.go @@ -15,14 +15,24 @@ package collector import ( + "encoding/json" + "fmt" "net/http" + "strconv" + "strings" "sync" + "time" "github.com/prometheus/client_golang/prometheus" ) +const ( + connzEndpoint = "connz" + connzDetailedEndpoint = "connz_detailed" +) + func isConnzEndpoint(system, endpoint string) bool { - return system == CoreSystem && endpoint == "connz" + return system == CoreSystem && (endpoint == connzEndpoint || endpoint == connzDetailedEndpoint) } type connzCollector struct { @@ -30,57 +40,190 @@ type connzCollector struct { httpClient *http.Client servers []*CollectedServer + detailed bool - numConnections *prometheus.Desc - total *prometheus.Desc - offset *prometheus.Desc - limit *prometheus.Desc - pendingBytes *prometheus.Desc + numConnections *prometheus.Desc + total *prometheus.Desc + offset *prometheus.Desc + limit *prometheus.Desc + totalPendingBytes *prometheus.Desc + totalSubscriptions *prometheus.Desc + totalInBytes *prometheus.Desc + totalOutBytes *prometheus.Desc + totalInMsgs *prometheus.Desc + totalOutMsgs *prometheus.Desc + connzCollectorDetailed } -func newConnzCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector { - nc := &connzCollector{ +type connzCollectorDetailed struct { + pendingBytes *prometheus.Desc + subscriptions *prometheus.Desc + inBytes *prometheus.Desc + outBytes *prometheus.Desc + inMsgs *prometheus.Desc + outMsgs *prometheus.Desc + start *prometheus.Desc + lastActivity *prometheus.Desc + rtt *prometheus.Desc + uptime *prometheus.Desc + idle *prometheus.Desc +} + +func createConnzCollector(system string) *connzCollector { + summaryLabels := []string{"server_id"} + return &connzCollector{ httpClient: http.DefaultClient, numConnections: prometheus.NewDesc( - prometheus.BuildFQName(system, endpoint, "num_connections"), + prometheus.BuildFQName(system, connzEndpoint, "num_connections"), "num_connections", - []string{"server_id"}, + summaryLabels, nil, ), offset: prometheus.NewDesc( - prometheus.BuildFQName(system, endpoint, "offset"), + prometheus.BuildFQName(system, connzEndpoint, "offset"), "offset", - []string{"server_id"}, + summaryLabels, nil, ), total: prometheus.NewDesc( - prometheus.BuildFQName(system, endpoint, "total"), + prometheus.BuildFQName(system, connzEndpoint, "total"), "total", - []string{"server_id"}, + summaryLabels, nil, ), limit: prometheus.NewDesc( - prometheus.BuildFQName(system, endpoint, "limit"), + prometheus.BuildFQName(system, connzEndpoint, "limit"), "limit", - []string{"server_id"}, + summaryLabels, nil, ), - pendingBytes: prometheus.NewDesc( - prometheus.BuildFQName(system, endpoint, "pending_bytes"), + totalPendingBytes: prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "pending_bytes"), "pending_bytes", - []string{"server_id"}, + summaryLabels, + nil, + ), + totalSubscriptions: prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "subscriptions"), + "subscriptions", + summaryLabels, + nil, + ), + totalInBytes: prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "in_bytes"), + "in_bytes", + summaryLabels, + nil, + ), + totalOutBytes: prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "out_bytes"), + "out_bytes", + summaryLabels, + nil, + ), + totalInMsgs: prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "in_msgs"), + "in_msgs", + summaryLabels, + nil, + ), + totalOutMsgs: prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "out_msgs"), + "out_msgs", + summaryLabels, nil, ), } +} +func createConnzDetailedCollector(system string) *connzCollector { + connzCollector := createConnzCollector(system) + detailLabels := []string{"server_id", "cid", "kind", "type", "ip", "port", "name", "lang", + "version", "tls_version", "tls_cipher_suite"} + connzCollector.pendingBytes = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "pending_bytes"), + "pending_bytes", + detailLabels, + nil, + ) + connzCollector.subscriptions = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "subscriptions"), + "subscriptions", + detailLabels, + nil, + ) + connzCollector.inBytes = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "in_bytes"), + "in_bytes", + detailLabels, + nil, + ) + connzCollector.outBytes = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "out_bytes"), + "out_bytes", + detailLabels, + nil, + ) + connzCollector.inMsgs = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "in_msgs"), + "in_msgs", + detailLabels, + nil, + ) + connzCollector.outMsgs = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "out_msgs"), + "out_msgs", + detailLabels, + nil, + ) + connzCollector.start = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "start"), + "epoch time at which the connection was started", + detailLabels, + nil, + ) + connzCollector.lastActivity = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "last_activity"), + "epoch time at which the last activity was registred", + detailLabels, + nil, + ) + connzCollector.rtt = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "rtt"), + "response time latency in microseconds", + detailLabels, + nil, + ) + connzCollector.uptime = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "uptime"), + "uptime duration in milliseconds", + detailLabels, + nil, + ) + connzCollector.idle = prometheus.NewDesc( + prometheus.BuildFQName(system, connzEndpoint, "idle"), + "idle time duration in milliseconds", + detailLabels, + nil, + ) + return connzCollector +} + +func newConnzCollector(system, endpoint string, servers []*CollectedServer) prometheus.Collector { + var nc *connzCollector + if endpoint == connzDetailedEndpoint { + nc = createConnzDetailedCollector(system) + nc.detailed = true + } else { + nc = createConnzCollector(system) + } nc.servers = make([]*CollectedServer, len(servers)) for i, s := range servers { nc.servers[i] = &CollectedServer{ ID: s.ID, - URL: s.URL + "/connz", + URL: s.URL + "/" + connzEndpoint, } } - return nc } @@ -97,26 +240,234 @@ func (nc *connzCollector) Collect(ch chan<- prometheus.Metric) { continue } - var pendingBytes = 0 + var pendingBytes, subscriptions, inBytes, outBytes, inMsgs, outMsgs float64 for _, conn := range resp.Connections { pendingBytes += conn.PendingBytes + subscriptions += conn.Subscriptions + inBytes += conn.InBytes + outBytes += conn.OutBytes + inMsgs += conn.InMsgs + outMsgs += conn.OutMsgs + if nc.detailed { + detailLabelValues := []string{server.ID, conn.Cid, conn.Kind, conn.Type, conn.IP, conn.Port, + conn.Name, conn.Lang, conn.Version, conn.TLSVersion, conn.TLSCipherSuite} + ch <- prometheus.MustNewConstMetric(nc.pendingBytes, prometheus.GaugeValue, conn.PendingBytes, detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.subscriptions, prometheus.GaugeValue, conn.Subscriptions, + detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.inBytes, prometheus.CounterValue, conn.InBytes, detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.outBytes, prometheus.CounterValue, conn.OutBytes, detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.inMsgs, prometheus.CounterValue, conn.InMsgs, detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.outMsgs, prometheus.CounterValue, conn.OutMsgs, detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.start, prometheus.UntypedValue, conn.Start, detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.lastActivity, prometheus.UntypedValue, conn.LastActivity, + detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.rtt, prometheus.GaugeValue, conn.Rtt, detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.uptime, prometheus.UntypedValue, conn.Uptime, detailLabelValues...) + ch <- prometheus.MustNewConstMetric(nc.idle, prometheus.GaugeValue, conn.Idle, detailLabelValues...) + } } - ch <- prometheus.MustNewConstMetric(nc.numConnections, prometheus.GaugeValue, float64(resp.NumConnections), server.ID) - ch <- prometheus.MustNewConstMetric(nc.total, prometheus.GaugeValue, float64(resp.Total), server.ID) - ch <- prometheus.MustNewConstMetric(nc.offset, prometheus.GaugeValue, float64(resp.Offset), server.ID) - ch <- prometheus.MustNewConstMetric(nc.limit, prometheus.GaugeValue, float64(resp.Limit), server.ID) - ch <- prometheus.MustNewConstMetric(nc.pendingBytes, prometheus.GaugeValue, float64(pendingBytes), server.ID) + ch <- prometheus.MustNewConstMetric(nc.numConnections, prometheus.GaugeValue, resp.NumConnections, server.ID) + ch <- prometheus.MustNewConstMetric(nc.total, prometheus.GaugeValue, resp.Total, server.ID) + ch <- prometheus.MustNewConstMetric(nc.offset, prometheus.GaugeValue, resp.Offset, server.ID) + ch <- prometheus.MustNewConstMetric(nc.limit, prometheus.GaugeValue, resp.Limit, server.ID) + ch <- prometheus.MustNewConstMetric(nc.totalPendingBytes, prometheus.GaugeValue, pendingBytes, server.ID) + ch <- prometheus.MustNewConstMetric(nc.totalSubscriptions, prometheus.GaugeValue, subscriptions, server.ID) + ch <- prometheus.MustNewConstMetric(nc.totalInBytes, prometheus.CounterValue, inBytes, server.ID) + ch <- prometheus.MustNewConstMetric(nc.totalOutBytes, prometheus.CounterValue, outBytes, server.ID) + ch <- prometheus.MustNewConstMetric(nc.totalInMsgs, prometheus.CounterValue, inMsgs, server.ID) + ch <- prometheus.MustNewConstMetric(nc.totalOutMsgs, prometheus.CounterValue, outMsgs, server.ID) } } // Connz output type Connz struct { - NumConnections int `json:"num_connections"` - Total int `json:"total"` - Offset int `json:"offset"` - Limit int `json:"limit"` - Connections []struct { - PendingBytes int `json:"pending_bytes"` - } `json:"connections"` + NumConnections float64 `json:"num_connections"` + Total float64 `json:"total"` + Offset float64 `json:"offset"` + Limit float64 `json:"limit"` + Connections []ConnzConnection `json:"connections"` +} + +// ConnzConnection represents the connections details +type ConnzConnection struct { + Cid string `json:"cid"` + Kind string `json:"kind"` + Type string `json:"type"` + IP string `json:"ip"` + Port string `json:"port"` + Start float64 `json:"start"` + LastActivity float64 `json:"last_activity"` + Rtt float64 `json:"rtt"` + Uptime float64 `json:"uptime"` + Idle float64 `json:"idle"` + PendingBytes float64 `json:"pending_bytes"` + InMsgs float64 `json:"in_msgs"` + OutMsgs float64 `json:"out_msgs"` + InBytes float64 `json:"in_bytes"` + OutBytes float64 `json:"out_bytes"` + Subscriptions float64 `json:"subscriptions"` + Name string `json:"name"` + Lang string `json:"lang"` + Version string `json:"version"` + TLSVersion string `json:"tls_version"` + TLSCipherSuite string `json:"tls_cipher_suite"` +} + +// UnmarshalJSON converts JSON string to struct. This is required as we want to +// parse time or duration fields as `time.Duration` and then to milliseconds +func (c *ConnzConnection) UnmarshalJSON(data []byte) error { + var connection map[string]interface{} + if err := json.Unmarshal(data, &connection); err != nil { + return err + } + if val, exists := connection["cid"]; exists { + c.Cid = fmt.Sprintf("%v", val) + } + if val, exists := connection["kind"]; exists { + c.Kind = val.(string) + } + if val, exists := connection["type"]; exists { + c.Type = val.(string) + } + if val, exists := connection["ip"]; exists { + c.IP = val.(string) + } + if val, exists := connection["port"]; exists { + c.Port = fmt.Sprintf("%v", val) + } + if val, exists := connection["start"]; exists { + c.Start = parseDateString(val.(string)) + } + if val, exists := connection["last_activity"]; exists { + c.LastActivity = parseDateString(val.(string)) + } + if val, exists := connection["rtt"]; exists { + // rtt should be in seconds at most! + if parsedVal, err := time.ParseDuration(val.(string)); err == nil { + c.Rtt = float64(parsedVal.Microseconds()) + } else { + Errorf("string %s could not be parsed as duration for rtt: %s", val.(string), err) + c.Rtt = -1 + } + } + if val, exists := connection["uptime"]; exists { + c.Uptime = parseDuration(val.(string)) + } + if val, exists := connection["idle"]; exists { + c.Idle = parseDuration(val.(string)) + } + if val, exists := connection["pending_bytes"]; exists { + c.PendingBytes = val.(float64) + } + if val, exists := connection["in_msgs"]; exists { + c.InMsgs = val.(float64) + } + if val, exists := connection["out_msgs"]; exists { + c.OutMsgs = val.(float64) + } + if val, exists := connection["in_bytes"]; exists { + c.InBytes = val.(float64) + } + if val, exists := connection["out_bytes"]; exists { + c.OutBytes = val.(float64) + } + if val, exists := connection["subscriptions"]; exists { + c.Subscriptions = val.(float64) + } + if val, exists := connection["name"]; exists { + c.Name = val.(string) + } + if val, exists := connection["lang"]; exists { + c.Lang = val.(string) + } + if val, exists := connection["version"]; exists { + c.Version = val.(string) + } + if val, exists := connection["tls_version"]; exists { + c.TLSVersion = val.(string) + } + if val, exists := connection["tls_cipher_suite"]; exists { + c.TLSCipherSuite = val.(string) + } + return nil +} + +// parse a date-time string as epoch milliseconds +func parseDateString(data string) float64 { + theTime, err := time.Parse(time.RFC3339Nano, data) + if err != nil { + Errorf("could not parse value %s as a date-time object using the layout %s", data, time.RFC3339Nano) + return -1 + } + return float64(theTime.UnixMilli()) +} + +// parse the duration as epoch milliseconds +// for some reason NATS server deviated away from the allowed options +// for duration. Please see https://github.com/nats-io/nats-server/blob/main/server/monitor.go#L1309 +// or (if the lines changed) check the function `server.myUptime(d time.Duration) string ` +// duration can possibly have `y`, `d`, `h`, `m`, `s` +// for years 365 days is factored in NATS server +func parseDuration(data string) float64 { + accruedHours, i := extractHoursFromYearsAndDays(data) + if accruedHours == -1 { + return -1 + } + durationWithoutYearsAndDays := data[i:] + splitByHours := strings.Split(durationWithoutYearsAndDays, "h") + durationToParse := "" + switch len(splitByHours) { + case 1: + durationToParse = fmt.Sprintf("%dh%s", accruedHours, splitByHours[0]) + case 2: + if hours, err := strconv.Atoi(splitByHours[0]); err == nil { + accruedHours += hours + durationToParse = fmt.Sprintf("%dh%s", accruedHours, splitByHours[1]) + } else { + Errorf("string %s could not be parsed as duration: %s", data, err) + return -1 + } + default: + Errorf("string %s could not be parsed as duration", data) + return -1 + } + parsedValue, err := time.ParseDuration(durationToParse) + if err == nil { + return float64(parsedValue.Milliseconds()) + } + Errorf("string %s could not be parsed as duration: %s", data, err) + return -1 +} + +// extract years and days as hours from the provided string +// to be able to parse the string as duration +func extractHoursFromYearsAndDays(data string) (int, int) { + accruedHours := 0 + valueSoFar := "" + for i, v := range data { + switch v { + case 'y': + if value, err := strconv.Atoi(valueSoFar); err == nil { + accruedHours += value * 365 * 24 + valueSoFar = "" + } else { + Errorf("string %s could not be parsed as duration: %s", data, err) + return -1, -1 + } + case 'd': + value, err := strconv.Atoi(valueSoFar) + if err == nil { + accruedHours += value * 24 + return accruedHours, i + 1 + } + Errorf("string %s could not be parsed as duration: %s", data, err) + return -1, -1 + case 'h', 'm', 's': + return accruedHours, 0 + default: + valueSoFar += string(v) + } + } + return accruedHours, 0 } diff --git a/exporter/exporter.go b/exporter/exporter.go index 6657dcc2..2a475f07 100644 --- a/exporter/exporter.go +++ b/exporter/exporter.go @@ -46,6 +46,7 @@ type NATSExporterOptions struct { ScrapePath string GetHealthz bool GetConnz bool + GetConnzDetailed bool GetVarz bool GetSubz bool GetRoutez bool @@ -174,9 +175,9 @@ func (ne *NATSExporter) InitializeCollectors() error { } getJsz := opts.GetJszFilter != "" - if !opts.GetHealthz && !opts.GetConnz && !opts.GetRoutez && !opts.GetSubz && !opts.GetVarz && - !opts.GetGatewayz && !opts.GetLeafz && !opts.GetStreamingChannelz && - !opts.GetStreamingServerz && !opts.GetReplicatorVarz && !getJsz { + if !opts.GetHealthz && !opts.GetConnz && !opts.GetConnzDetailed && !opts.GetRoutez && + !opts.GetSubz && !opts.GetVarz && !opts.GetGatewayz && !opts.GetLeafz && + !opts.GetStreamingChannelz && !opts.GetStreamingServerz && !opts.GetReplicatorVarz && !getJsz { return fmt.Errorf("no Collectors specfied") } if opts.GetReplicatorVarz && opts.GetVarz { @@ -191,7 +192,9 @@ func (ne *NATSExporter) InitializeCollectors() error { if opts.GetHealthz { ne.createCollector(collector.CoreSystem, "healthz") } - if opts.GetConnz { + if opts.GetConnzDetailed { + ne.createCollector(collector.CoreSystem, "connz_detailed") + } else if opts.GetConnz { ne.createCollector(collector.CoreSystem, "connz") } if opts.GetGatewayz { diff --git a/main.go b/main.go index 2101e1bb..80d5e52c 100644 --- a/main.go +++ b/main.go @@ -111,6 +111,8 @@ func main() { flag.BoolVar(&opts.Trace, "V", false, "Enable trace log level.") flag.BoolVar(&debugAndTrace, "DV", false, "Enable debug and trace log levels.") flag.BoolVar(&opts.GetConnz, "connz", false, "Get connection metrics.") + flag.BoolVar(&opts.GetConnzDetailed, "connz_detailed", false, + "Get detailed connection metrics for each client. Enables flag `connz` implicitly.") flag.BoolVar(&opts.GetHealthz, "healthz", false, "Get health metrics.") flag.BoolVar(&opts.GetReplicatorVarz, "replicatorVarz", false, "Get replicator general metrics.") flag.BoolVar(&opts.GetGatewayz, "gatewayz", false, "Get gateway metrics.")