Skip to content

Commit

Permalink
added self label to queues and connections
Browse files Browse the repository at this point in the history
  • Loading branch information
kbudde committed Nov 24, 2018
1 parent c1df1fd commit 97f6768
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 108 deletions.
43 changes: 29 additions & 14 deletions bertmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,17 @@ import (
// implementation) allow parsing of BERT-encoded RabbitMQ replies in a
// way that's fully compatible with JSON parser from jsonmap.go
type rabbitBERTReply struct {
body []byte
body []byte
objects bert.Term
}

func makeBERTReply(body []byte) RabbitReply {
return &rabbitBERTReply{body}
func makeBERTReply(body []byte) (RabbitReply, error) {
rawObjects, err := bert.Decode(body)
return &rabbitBERTReply{body, rawObjects}, err
}

func (rep *rabbitBERTReply) MakeStatsInfo(labels []string) []StatsInfo {
rawObjects, err := bert.Decode(rep.body)
if err != nil {
log.WithField("error", err).Error("Error while decoding bert")
return make([]StatsInfo, 0)
}
rawObjects := rep.objects

objects, ok := rawObjects.([]bert.Term)
if !ok {
Expand All @@ -48,12 +46,7 @@ func (rep *rabbitBERTReply) MakeStatsInfo(labels []string) []StatsInfo {

func (rep *rabbitBERTReply) MakeMap() MetricMap {
flMap := make(MetricMap)
term, err := bert.Decode(rep.body)

if err != nil {
log.WithField("error", err).Error("Error while decoding bert")
return flMap
}
term := rep.objects

parseProplist(&flMap, "", term)
return flMap
Expand Down Expand Up @@ -321,3 +314,25 @@ func (err *bertDecodeError) Error() string {
func bertError(message string, object interface{}) error {
return &bertDecodeError{message, object}
}

func (rep *rabbitBERTReply) GetString(label string) (string, bool) {
var resValue string
var result bool
result = false

iterateBertKV(rep.objects, func(key string, value interface{}) bool {
//Check if current key should be saved as label

if key == label {
tmp, ok := parseBertStringy(value)
if !ok {
return false
}
resValue = tmp
result = true
return false
}
return true
})
return resValue, result
}
8 changes: 4 additions & 4 deletions bertmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func assertBertStatsEquivalence(t *testing.T, baseFileName string, labels []stri
t.Helper()
json, bert := tryReadFiles(t, baseFileName, "json", "bert")

jsonReply := makeJSONReply(json)
bertReply := makeBERTReply(bert)
jsonReply, _ := makeJSONReply(json)
bertReply, _ := makeBERTReply(bert)

bertParsed := bertReply.MakeStatsInfo(labels)
jsonParsed := jsonReply.MakeStatsInfo(labels)
Expand All @@ -67,8 +67,8 @@ func assertBertStatsEquivalence(t *testing.T, baseFileName string, labels []stri
func assertBertMetricMapEquivalence(t *testing.T, baseFileName string) {
json, bert := tryReadFiles(t, baseFileName, "json", "bert")

jsonReply := makeJSONReply(json)
bertReply := makeBERTReply(bert)
jsonReply, _ := makeJSONReply(json)
bertReply, _ := makeBERTReply(bert)

bertParsed := bertReply.MakeMap()
jsonParsed := jsonReply.MakeMap()
Expand Down
8 changes: 6 additions & 2 deletions decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ type RabbitReply interface {
// RabbitMQ objects (i.e. list of queues, exchanges, etc.).
// Failure to parse should result in an empty result list.
MakeStatsInfo([]string) []StatsInfo

// GetString returns the string value for the given key
// If the key cannot be found the second return is false
GetString(key string) (string, bool)
}

// MakeReply instantiates the apropriate reply parser for a given
// reply and the current configuration.
func MakeReply(config rabbitExporterConfig, body []byte) (RabbitReply, error) {
if isCapEnabled(config, rabbitCapBert) {
return makeBERTReply(body), nil
return makeBERTReply(body)
}
return makeJSONReply(body), nil
return makeJSONReply(body)
}
2 changes: 2 additions & 0 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) {
ctx = context.WithValue(ctx, endpointScrapeDuration, e.endpointScrapeDurationMetric)
ctx = context.WithValue(ctx, endpointUpMetric, e.endpointUpMetric)
if err := collectWithDuration(ctx, e.overviewExporter, "overview", ch); err != nil {
log.WithError(err).Warn("retrieving overview failed")
allUp = false
}

Expand All @@ -97,6 +98,7 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) {
for name, ex := range e.exporter {

if err := collectWithDuration(ctx, ex, name, ch); err != nil {
log.WithError(err).Warn("retrieving " + name + " failed")
allUp = false
}
}
Expand Down
23 changes: 18 additions & 5 deletions exporter_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ func init() {
}

var (
connectionLabels = []string{"vhost", "node", "peer_host", "user"}
connectionLabelsStateMetric = []string{"vhost", "node", "peer_host", "user", "state"}
connectionLabelKeys = []string{"vhost", "node", "peer_host", "user", "state"}
connectionLabels = []string{"vhost", "node", "peer_host", "user", "self"}
connectionLabelsStateMetric = []string{"vhost", "node", "peer_host", "user", "state", "self"}
connectionLabelKeys = []string{"vhost", "node", "peer_host", "user", "state", "node"}

connectionGaugeVec = map[string]*prometheus.GaugeVec{
"channels": newGaugeVec("connection_channels", "number of channels in use", connectionLabels),
Expand Down Expand Up @@ -52,17 +52,30 @@ func (e exporterConnections) Collect(ctx context.Context, ch chan<- prometheus.M
}
e.stateMetric.Reset()

selfNode := ""
if n, ok := ctx.Value(nodeName).(string); ok {
selfNode = n
}

for key, gauge := range e.metricsGV {
for _, connD := range connectionData {
if value, ok := connD.metrics[key]; ok {
gauge.WithLabelValues(connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"]).Add(value)
self := "0"
if connD.labels["node"] == selfNode {
self = "1"
}
gauge.WithLabelValues(connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], self).Add(value)
}
}
}

for _, connD := range connectionData {
if _, ok := connD.metrics["channels"]; ok { // "channels" is used to retrieve one record per connection for setting the state
e.stateMetric.WithLabelValues(connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], connD.labels["state"]).Add(1)
self := "0"
if connD.labels["node"] == selfNode {
self = "1"
}
e.stateMetric.WithLabelValues(connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], connD.labels["state"], self).Add(1)
}
}

Expand Down
15 changes: 5 additions & 10 deletions exporter_overview.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package main

import (
"bytes"
"context"
"encoding/json"
"io"

log "github.com/Sirupsen/logrus"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -58,17 +55,15 @@ func (e *exporterOverview) Collect(ctx context.Context, ch chan<- prometheus.Met
return err
}

dec := json.NewDecoder(bytes.NewReader(body))
var nodeInfo NodeInfo
if err := dec.Decode(&nodeInfo); err == io.EOF {
return err
}
e.nodeInfo = nodeInfo

reply, err := MakeReply(config, body)
if err != nil {
return err
}

e.nodeInfo.Node, _ = reply.GetString("node")
e.nodeInfo.ErlangVersion, _ = reply.GetString("erlang_version")
e.nodeInfo.RabbitmqVersion, _ = reply.GetString("rabbitmq_version")

rabbitMqOverviewData := reply.MakeMap()

log.WithField("overviewData", rabbitMqOverviewData).Debug("Overview data")
Expand Down
29 changes: 23 additions & 6 deletions exporter_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ func init() {
}

var (
queueLabels = []string{"vhost", "queue", "durable", "policy"}
queueLabelKeys = []string{"vhost", "name", "durable", "policy", "state"}
queueLabels = []string{"vhost", "queue", "durable", "policy", "self"}
queueLabelKeys = []string{"vhost", "name", "durable", "policy", "state", "node"}

queueGaugeVec = map[string]*prometheus.GaugeVec{
"messages_ready": newGaugeVec("queue_messages_ready", "Number of messages ready to be delivered to clients.", queueLabels),
Expand Down Expand Up @@ -93,6 +93,11 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
}
}

selfNode := ""
if n, ok := ctx.Value(nodeName).(string); ok {
selfNode = n
}

rabbitMqQueueData, err := getStatsInfo(config, "queues", queueLabelKeys)

if err != nil {
Expand All @@ -110,8 +115,12 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
if skipVhost := config.SkipVHost.MatchString(strings.ToLower(vname)); !skipVhost {
if matchInclude := config.IncludeQueues.MatchString(strings.ToLower(qname)); matchInclude {
if matchSkip := config.SkipQueues.MatchString(strings.ToLower(qname)); !matchSkip {
self := "0"
if queue.labels["node"] == selfNode {
self = "1"
}
// log.WithFields(log.Fields{"vhost": queue.labels["vhost"], "queue": queue.labels["name"], "key": key, "value": value}).Info("Set queue metric for key")
gaugevec.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"]).Set(value)
gaugevec.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self).Set(value)
}
}
}
Expand All @@ -129,7 +138,11 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
if skipVhost := config.SkipVHost.MatchString(strings.ToLower(vname)); !skipVhost {
if matchInclude := config.IncludeQueues.MatchString(strings.ToLower(qname)); matchInclude {
if matchSkip := config.SkipQueues.MatchString(strings.ToLower(qname)); !matchSkip {
e.stateMetric.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], queue.labels["state"]).Set(1)
self := "0"
if queue.labels["node"] == selfNode {
self = "1"
}
e.stateMetric.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self, queue.labels["state"]).Set(1)
}
}
}
Expand All @@ -146,10 +159,14 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
if skipVhost := config.SkipVHost.MatchString(strings.ToLower(vname)); !skipVhost {
if matchInclude := config.IncludeQueues.MatchString(strings.ToLower(qname)); matchInclude {
if matchSkip := config.SkipQueues.MatchString(strings.ToLower(qname)); !matchSkip {
self := "0"
if queue.labels["node"] == selfNode {
self = "1"
}
if value, ok := queue.metrics[key]; ok {
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"])
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self)
} else {
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"])
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self)
}
}
}
Expand Down
Loading

0 comments on commit 97f6768

Please sign in to comment.