From 97f676825428c02a0a168a8236491179073c24d0 Mon Sep 17 00:00:00 2001 From: Kris Budde Date: Sat, 24 Nov 2018 21:53:20 +0100 Subject: [PATCH] added self label to queues and connections --- bertmap.go | 43 +++++++++++------ bertmap_test.go | 8 ++-- decoder.go | 8 +++- exporter.go | 2 + exporter_connections.go | 23 +++++++-- exporter_overview.go | 15 ++---- exporter_queue.go | 29 +++++++++--- exporter_test.go | 100 ++++++++++++++++++++-------------------- integration_test.go | 8 ++-- jsonmap.go | 42 +++++++++++++---- jsonmap_test.go | 8 ++-- 11 files changed, 178 insertions(+), 108 deletions(-) diff --git a/bertmap.go b/bertmap.go index 2c7aac6..61d71a9 100644 --- a/bertmap.go +++ b/bertmap.go @@ -12,19 +12,17 @@ import ( // implementation) allow parsing of BERT-encoded RabbitMQ replies in a // way that's fully compatible with JSON parser from jsonmap.go type rabbitBERTReply struct { - body []byte + body []byte + objects bert.Term } -func makeBERTReply(body []byte) RabbitReply { - return &rabbitBERTReply{body} +func makeBERTReply(body []byte) (RabbitReply, error) { + rawObjects, err := bert.Decode(body) + return &rabbitBERTReply{body, rawObjects}, err } func (rep *rabbitBERTReply) MakeStatsInfo(labels []string) []StatsInfo { - rawObjects, err := bert.Decode(rep.body) - if err != nil { - log.WithField("error", err).Error("Error while decoding bert") - return make([]StatsInfo, 0) - } + rawObjects := rep.objects objects, ok := rawObjects.([]bert.Term) if !ok { @@ -48,12 +46,7 @@ func (rep *rabbitBERTReply) MakeStatsInfo(labels []string) []StatsInfo { func (rep *rabbitBERTReply) MakeMap() MetricMap { flMap := make(MetricMap) - term, err := bert.Decode(rep.body) - - if err != nil { - log.WithField("error", err).Error("Error while decoding bert") - return flMap - } + term := rep.objects parseProplist(&flMap, "", term) return flMap @@ -321,3 +314,25 @@ func (err *bertDecodeError) Error() string { func bertError(message string, object interface{}) error { return &bertDecodeError{message, object} } + +func (rep *rabbitBERTReply) GetString(label string) (string, bool) { + var resValue string + var result bool + result = false + + iterateBertKV(rep.objects, func(key string, value interface{}) bool { + //Check if current key should be saved as label + + if key == label { + tmp, ok := parseBertStringy(value) + if !ok { + return false + } + resValue = tmp + result = true + return false + } + return true + }) + return resValue, result +} diff --git a/bertmap_test.go b/bertmap_test.go index 4971dab..103faed 100644 --- a/bertmap_test.go +++ b/bertmap_test.go @@ -53,8 +53,8 @@ func assertBertStatsEquivalence(t *testing.T, baseFileName string, labels []stri t.Helper() json, bert := tryReadFiles(t, baseFileName, "json", "bert") - jsonReply := makeJSONReply(json) - bertReply := makeBERTReply(bert) + jsonReply, _ := makeJSONReply(json) + bertReply, _ := makeBERTReply(bert) bertParsed := bertReply.MakeStatsInfo(labels) jsonParsed := jsonReply.MakeStatsInfo(labels) @@ -67,8 +67,8 @@ func assertBertStatsEquivalence(t *testing.T, baseFileName string, labels []stri func assertBertMetricMapEquivalence(t *testing.T, baseFileName string) { json, bert := tryReadFiles(t, baseFileName, "json", "bert") - jsonReply := makeJSONReply(json) - bertReply := makeBERTReply(bert) + jsonReply, _ := makeJSONReply(json) + bertReply, _ := makeBERTReply(bert) bertParsed := bertReply.MakeMap() jsonParsed := jsonReply.MakeMap() diff --git a/decoder.go b/decoder.go index efe9248..2f31cc2 100644 --- a/decoder.go +++ b/decoder.go @@ -25,13 +25,17 @@ type RabbitReply interface { // RabbitMQ objects (i.e. list of queues, exchanges, etc.). // Failure to parse should result in an empty result list. MakeStatsInfo([]string) []StatsInfo + + // GetString returns the string value for the given key + // If the key cannot be found the second return is false + GetString(key string) (string, bool) } // MakeReply instantiates the apropriate reply parser for a given // reply and the current configuration. func MakeReply(config rabbitExporterConfig, body []byte) (RabbitReply, error) { if isCapEnabled(config, rabbitCapBert) { - return makeBERTReply(body), nil + return makeBERTReply(body) } - return makeJSONReply(body), nil + return makeJSONReply(body) } diff --git a/exporter.go b/exporter.go index b0d4a23..90f6ced 100644 --- a/exporter.go +++ b/exporter.go @@ -89,6 +89,7 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) { ctx = context.WithValue(ctx, endpointScrapeDuration, e.endpointScrapeDurationMetric) ctx = context.WithValue(ctx, endpointUpMetric, e.endpointUpMetric) if err := collectWithDuration(ctx, e.overviewExporter, "overview", ch); err != nil { + log.WithError(err).Warn("retrieving overview failed") allUp = false } @@ -97,6 +98,7 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) { for name, ex := range e.exporter { if err := collectWithDuration(ctx, ex, name, ch); err != nil { + log.WithError(err).Warn("retrieving " + name + " failed") allUp = false } } diff --git a/exporter_connections.go b/exporter_connections.go index eea877f..42aa0ab 100644 --- a/exporter_connections.go +++ b/exporter_connections.go @@ -11,9 +11,9 @@ func init() { } var ( - connectionLabels = []string{"vhost", "node", "peer_host", "user"} - connectionLabelsStateMetric = []string{"vhost", "node", "peer_host", "user", "state"} - connectionLabelKeys = []string{"vhost", "node", "peer_host", "user", "state"} + connectionLabels = []string{"vhost", "node", "peer_host", "user", "self"} + connectionLabelsStateMetric = []string{"vhost", "node", "peer_host", "user", "state", "self"} + connectionLabelKeys = []string{"vhost", "node", "peer_host", "user", "state", "node"} connectionGaugeVec = map[string]*prometheus.GaugeVec{ "channels": newGaugeVec("connection_channels", "number of channels in use", connectionLabels), @@ -52,17 +52,30 @@ func (e exporterConnections) Collect(ctx context.Context, ch chan<- prometheus.M } e.stateMetric.Reset() + selfNode := "" + if n, ok := ctx.Value(nodeName).(string); ok { + selfNode = n + } + for key, gauge := range e.metricsGV { for _, connD := range connectionData { if value, ok := connD.metrics[key]; ok { - gauge.WithLabelValues(connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"]).Add(value) + self := "0" + if connD.labels["node"] == selfNode { + self = "1" + } + gauge.WithLabelValues(connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], self).Add(value) } } } for _, connD := range connectionData { if _, ok := connD.metrics["channels"]; ok { // "channels" is used to retrieve one record per connection for setting the state - e.stateMetric.WithLabelValues(connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], connD.labels["state"]).Add(1) + self := "0" + if connD.labels["node"] == selfNode { + self = "1" + } + e.stateMetric.WithLabelValues(connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], connD.labels["state"], self).Add(1) } } diff --git a/exporter_overview.go b/exporter_overview.go index 4df1a68..1eaf0f2 100644 --- a/exporter_overview.go +++ b/exporter_overview.go @@ -1,10 +1,7 @@ package main import ( - "bytes" "context" - "encoding/json" - "io" log "github.com/Sirupsen/logrus" "github.com/prometheus/client_golang/prometheus" @@ -58,17 +55,15 @@ func (e *exporterOverview) Collect(ctx context.Context, ch chan<- prometheus.Met return err } - dec := json.NewDecoder(bytes.NewReader(body)) - var nodeInfo NodeInfo - if err := dec.Decode(&nodeInfo); err == io.EOF { - return err - } - e.nodeInfo = nodeInfo - reply, err := MakeReply(config, body) if err != nil { return err } + + e.nodeInfo.Node, _ = reply.GetString("node") + e.nodeInfo.ErlangVersion, _ = reply.GetString("erlang_version") + e.nodeInfo.RabbitmqVersion, _ = reply.GetString("rabbitmq_version") + rabbitMqOverviewData := reply.MakeMap() log.WithField("overviewData", rabbitMqOverviewData).Debug("Overview data") diff --git a/exporter_queue.go b/exporter_queue.go index 4b023cc..f39c8b5 100644 --- a/exporter_queue.go +++ b/exporter_queue.go @@ -13,8 +13,8 @@ func init() { } var ( - queueLabels = []string{"vhost", "queue", "durable", "policy"} - queueLabelKeys = []string{"vhost", "name", "durable", "policy", "state"} + queueLabels = []string{"vhost", "queue", "durable", "policy", "self"} + queueLabelKeys = []string{"vhost", "name", "durable", "policy", "state", "node"} queueGaugeVec = map[string]*prometheus.GaugeVec{ "messages_ready": newGaugeVec("queue_messages_ready", "Number of messages ready to be delivered to clients.", queueLabels), @@ -93,6 +93,11 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) } } + selfNode := "" + if n, ok := ctx.Value(nodeName).(string); ok { + selfNode = n + } + rabbitMqQueueData, err := getStatsInfo(config, "queues", queueLabelKeys) if err != nil { @@ -110,8 +115,12 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) if skipVhost := config.SkipVHost.MatchString(strings.ToLower(vname)); !skipVhost { if matchInclude := config.IncludeQueues.MatchString(strings.ToLower(qname)); matchInclude { if matchSkip := config.SkipQueues.MatchString(strings.ToLower(qname)); !matchSkip { + self := "0" + if queue.labels["node"] == selfNode { + self = "1" + } // log.WithFields(log.Fields{"vhost": queue.labels["vhost"], "queue": queue.labels["name"], "key": key, "value": value}).Info("Set queue metric for key") - gaugevec.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"]).Set(value) + gaugevec.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self).Set(value) } } } @@ -129,7 +138,11 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) if skipVhost := config.SkipVHost.MatchString(strings.ToLower(vname)); !skipVhost { if matchInclude := config.IncludeQueues.MatchString(strings.ToLower(qname)); matchInclude { if matchSkip := config.SkipQueues.MatchString(strings.ToLower(qname)); !matchSkip { - e.stateMetric.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], queue.labels["state"]).Set(1) + self := "0" + if queue.labels["node"] == selfNode { + self = "1" + } + e.stateMetric.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self, queue.labels["state"]).Set(1) } } } @@ -146,10 +159,14 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) if skipVhost := config.SkipVHost.MatchString(strings.ToLower(vname)); !skipVhost { if matchInclude := config.IncludeQueues.MatchString(strings.ToLower(qname)); matchInclude { if matchSkip := config.SkipQueues.MatchString(strings.ToLower(qname)); !matchSkip { + self := "0" + if queue.labels["node"] == selfNode { + self = "1" + } if value, ok := queue.metrics[key]; ok { - ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"]) + ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self) } else { - ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"]) + ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self) } } } diff --git a/exporter_test.go b/exporter_test.go index e1a701a..09e615e 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -16,7 +16,7 @@ const ( queuesTestData = `[{"memory":16056,"message_stats":{"disk_writes":6,"disk_writes_details":{"rate":0.4},"publish":6,"publish_details":{"rate":0.4}},"messages":6,"messages_details":{"rate":0.4},"messages_ready":6,"messages_ready_details":{"rate":0.4},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"idle_since":"2015-07-07 19:02:19","consumer_utilisation":"","policy":"","exclusive_consumer_tag":"","consumers":0,"recoverable_slaves":"","state":"flow","messages_ram":6,"messages_ready_ram":6,"messages_unacknowledged_ram":0,"messages_persistent":6,"message_bytes":30,"message_bytes_ready":30,"message_bytes_unacknowledged":0,"message_bytes_ram":30,"message_bytes_persistent":30,"disk_reads":0,"disk_writes":6,"backing_queue_status":{"q1":0,"q2":0,"delta":["delta","undefined",0,"undefined"],"q3":0,"q4":6,"len":6,"target_ram_count":"infinity","next_seq_id":6,"avg_ingress_rate":0.007658533940556533,"avg_egress_rate":0.0,"avg_ack_ingress_rate":0.0,"avg_ack_egress_rate":0.0},"name":"myQueue1","vhost":"/","durable":true,"auto_delete":false,"arguments":{},"node":"my-rabbit@ae74c041248b"},{"memory":55344,"message_stats":{"disk_reads":25,"disk_reads_details":{"rate":0.0}},"messages":25,"messages_details":{"rate":0.0},"messages_ready":25,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"idle_since":"2015-07-07 18:57:52","consumer_utilisation":"","policy":"ha-2","exclusive_consumer_tag":"","consumers":0,"recoverable_slaves":"","state":"running","messages_ram":25,"messages_ready_ram":25,"messages_unacknowledged_ram":0,"messages_persistent":25,"message_bytes":75,"message_bytes_ready":75,"message_bytes_unacknowledged":0,"message_bytes_ram":75,"message_bytes_persistent":75,"disk_reads":25,"disk_writes":0,"backing_queue_status":{"q1":0,"q2":0,"delta":["delta","undefined",0,"undefined"],"q3":24,"q4":1,"len":25,"target_ram_count":"infinity","next_seq_id":16384,"avg_ingress_rate":0.0,"avg_egress_rate":0.0,"avg_ack_ingress_rate":0.0,"avg_ack_egress_rate":0.0},"name":"myQueue2","vhost":"/","durable":true,"auto_delete":false,"arguments":{},"node":"my-rabbit@ae74c041248b"},{"memory":34648,"message_stats":{"disk_reads":23,"disk_reads_details":{"rate":0.0}},"messages":23,"messages_details":{"rate":0.0},"messages_ready":23,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"idle_since":"2015-07-07 18:57:52","consumer_utilisation":"","policy":"","exclusive_consumer_tag":"","consumers":0,"recoverable_slaves":"","state":"running","messages_ram":23,"messages_ready_ram":23,"messages_unacknowledged_ram":0,"messages_persistent":23,"message_bytes":207,"message_bytes_ready":207,"message_bytes_unacknowledged":0,"message_bytes_ram":207,"message_bytes_persistent":207,"disk_reads":23,"disk_writes":0,"backing_queue_status":{"q1":0,"q2":0,"delta":["delta","undefined",0,"undefined"],"q3":22,"q4":1,"len":23,"target_ram_count":"infinity","next_seq_id":16384,"avg_ingress_rate":0.0,"avg_egress_rate":0.0,"avg_ack_ingress_rate":0.0,"avg_ack_egress_rate":0.0},"name":"myQueue3","vhost":"/","durable":true,"auto_delete":false,"arguments":{},"node":"my-rabbit@ae74c041248b"},{"memory":13912,"messages":0,"messages_details":{"rate":0.0},"messages_ready":0,"messages_ready_details":{"rate":0.0},"messages_unacknowledged":0,"messages_unacknowledged_details":{"rate":0.0},"idle_since":"2015-07-07 18:57:52","consumer_utilisation":"","policy":"","exclusive_consumer_tag":"","consumers":0,"recoverable_slaves":"","state":"running","messages_ram":0,"messages_ready_ram":0,"messages_unacknowledged_ram":0,"messages_persistent":0,"message_bytes":0,"message_bytes_ready":0,"message_bytes_unacknowledged":0,"message_bytes_ram":0,"message_bytes_persistent":0,"disk_reads":0,"disk_writes":0,"backing_queue_status":{"q1":0,"q2":0,"delta":["delta","undefined",0,"undefined"],"q3":0,"q4":0,"len":0,"target_ram_count":"infinity","next_seq_id":0,"avg_ingress_rate":0.0,"avg_egress_rate":0.0,"avg_ack_ingress_rate":0.0,"avg_ack_egress_rate":0.0},"name":"myQueue4","vhost":"vhost4","durable":true,"auto_delete":false,"arguments":{},"node":"my-rabbit@ae74c041248b"}]` exchangeAPIResponse = `[{"name":"","vhost":"/","type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}},{"name":"amq.direct","vhost":"/","type":"direct","durable":true,"auto_delete":false,"internal":false,"arguments":{}},{"name":"amq.fanout","vhost":"/","type":"fanout","durable":true,"auto_delete":false,"internal":false,"arguments":{}},{"name":"amq.headers","vhost":"/","type":"headers","durable":true,"auto_delete":false,"internal":false,"arguments":{}},{"name":"amq.match","vhost":"/","type":"headers","durable":true,"auto_delete":false,"internal":false,"arguments":{}},{"name":"amq.rabbitmq.log","vhost":"/","type":"topic","durable":true,"auto_delete":false,"internal":true,"arguments":{}},{"name":"amq.rabbitmq.trace","vhost":"/","type":"topic","durable":true,"auto_delete":false,"internal":true,"arguments":{}},{"name":"amq.topic","vhost":"/","type":"topic","durable":true,"auto_delete":false,"internal":false,"arguments":{}},{"message_stats":{"publish":0,"publish_details":{"rate":0.0},"publish_in":5,"publish_in_details":{"rate":0.0},"publish_out":0,"publish_out_details":{"rate":0.0},"ack":0,"ack_details":{"rate":0.0},"deliver_get":0,"deliver_get_details":{"rate":0.0},"confirm":5,"confirm_details":{"rate":0.0},"return_unroutable":5,"return_unroutable_details":{"rate":0.0},"redeliver":0,"redeliver_details":{"rate":0.0}},"name":"myExchange","vhost":"/","type":"fanout","durable":true,"auto_delete":false,"internal":false,"arguments":{}}]` nodesAPIResponse = `[{"mem_used":150456032,"mem_used_details":{"rate":25176},"fd_used":55,"fd_used_details":{"rate":0},"sockets_used":0,"sockets_used_details":{"rate":0},"proc_used":226,"proc_used_details":{"rate":0},"disk_free":189045161984,"disk_free_details":{"rate":0},"partitions":["rabbit@ribbit-0","rabbit@ribbit-1","rabbit@ribbit-3","rabbit@ribbit-4"],"os_pid":"113","fd_total":1048576,"sockets_total":943626,"mem_limit":838395494,"mem_alarm":false,"disk_free_limit":50000000,"disk_free_alarm":false,"proc_total":1048576,"rates_mode":"basic","uptime":3772165,"run_queue":0,"processors":4,"name":"my-rabbit@5a00cd8fe2f4","type":"disc","running":true}]` - connectionAPIResponse = `[{"auth_mechanism": "PLAIN","channel_max": 65535,"channels": 1,"client_properties": {"copyright": "Copyright (c) 2007-2014 VMWare Inc, Tony Garnock-Jones, and Alan Antonuk.","information": "See https://github.com/alanxz/rabbitmq-c","platform": "linux-gn","product": "rabbitmq-c","version": "0.5.3-pre"},"connected_at": 1501868641834,"frame_max": 131072,"garbage_collection": {"fullsweep_after": 65535,"min_bin_vheap_size": 46422,"min_heap_size": 233,"minor_gcs": 3},"host": "172.31.15.10","name": "172.31.0.130:32769 -> 172.31.15.10:5672","node": "rabbit@rmq-cluster-node-04","peer_cert_issuer": null,"peer_cert_subject": null,"peer_cert_validity": null,"peer_host": "172.31.0.130","peer_port": 32769,"port": 5672,"protocol": "AMQP 0-9-1","recv_cnt": 22708,"recv_oct": 8905713,"recv_oct_details": {"rate": 169.6},"reductions": 6257210,"reductions_details": {"rate": 148.8},"send_cnt": 6,"send_oct": 573,"send_oct_details": {"rate": 0.0},"send_pend": 0,"ssl": false,"ssl_cipher": null,"ssl_hash": null,"ssl_key_exchange": null,"ssl_protocol": null,"state": "running","timeout": 0,"type": "network","user": "rmq_oms","vhost": "/"},{"auth_mechanism": "PLAIN","channel_max": 65535,"channels": 1,"client_properties": {"copyright": "Copyright (c) 2007-2014 VMWare Inc, Tony Garnock-Jones, and Alan Antonuk.","information": "See https://github.com/alanxz/rabbitmq-c","platform": "linux-gn","product": "rabbitmq-c","version": "0.5.3-pre"},"connected_at": 1501868641834,"frame_max": 131072,"garbage_collection": {"fullsweep_after": 65535,"min_bin_vheap_size": 46422,"min_heap_size": 233,"minor_gcs": 3},"host": "172.31.15.10","name": "172.31.0.130:32769 -> 172.31.15.10:5672","node": "rabbit@rmq-cluster-node-04","peer_cert_issuer": null,"peer_cert_subject": null,"peer_cert_validity": null,"peer_host": "172.31.0.130","peer_port": 32769,"port": 5672,"protocol": "AMQP 0-9-1","recv_cnt": 22708,"recv_oct": 8905713,"recv_oct_details": {"rate": 169.6},"reductions": 6257210,"reductions_details": {"rate": 148.8},"send_cnt": 6,"send_oct": 573,"send_oct_details": {"rate": 0.0},"send_pend": 0,"ssl": false,"ssl_cipher": null,"ssl_hash": null,"ssl_key_exchange": null,"ssl_protocol": null,"state": "running","timeout": 0,"type": "network","user": "rmq_oms","vhost": "/"}]` + connectionAPIResponse = `[{"auth_mechanism": "PLAIN","channel_max": 65535,"channels": 1,"client_properties": {"copyright": "Copyright (c) 2007-2014 VMWare Inc, Tony Garnock-Jones, and Alan Antonuk.","information": "See https://github.com/alanxz/rabbitmq-c","platform": "linux-gn","product": "rabbitmq-c","version": "0.5.3-pre"},"connected_at": 1501868641834,"frame_max": 131072,"garbage_collection": {"fullsweep_after": 65535,"min_bin_vheap_size": 46422,"min_heap_size": 233,"minor_gcs": 3},"host": "172.31.15.10","name": "172.31.0.130:32769 -> 172.31.15.10:5672","node": "my-rabbit@ae74c041248b","peer_cert_issuer": null,"peer_cert_subject": null,"peer_cert_validity": null,"peer_host": "172.31.0.130","peer_port": 32769,"port": 5672,"protocol": "AMQP 0-9-1","recv_cnt": 22708,"recv_oct": 8905713,"recv_oct_details": {"rate": 169.6},"reductions": 6257210,"reductions_details": {"rate": 148.8},"send_cnt": 6,"send_oct": 573,"send_oct_details": {"rate": 0.0},"send_pend": 0,"ssl": false,"ssl_cipher": null,"ssl_hash": null,"ssl_key_exchange": null,"ssl_protocol": null,"state": "running","timeout": 0,"type": "network","user": "rmq_oms","vhost": "/"},{"auth_mechanism": "PLAIN","channel_max": 65535,"channels": 1,"client_properties": {"copyright": "Copyright (c) 2007-2014 VMWare Inc, Tony Garnock-Jones, and Alan Antonuk.","information": "See https://github.com/alanxz/rabbitmq-c","platform": "linux-gn","product": "rabbitmq-c","version": "0.5.3-pre"},"connected_at": 1501868641834,"frame_max": 131072,"garbage_collection": {"fullsweep_after": 65535,"min_bin_vheap_size": 46422,"min_heap_size": 233,"minor_gcs": 3},"host": "172.31.15.10","name": "172.31.0.130:32769 -> 172.31.15.10:5672","node": "rabbit@rmq-cluster-node-04","peer_cert_issuer": null,"peer_cert_subject": null,"peer_cert_validity": null,"peer_host": "172.31.0.130","peer_port": 32769,"port": 5672,"protocol": "AMQP 0-9-1","recv_cnt": 22708,"recv_oct": 8905713,"recv_oct_details": {"rate": 169.6},"reductions": 6257210,"reductions_details": {"rate": 148.8},"send_cnt": 6,"send_oct": 573,"send_oct_details": {"rate": 0.0},"send_pend": 0,"ssl": false,"ssl_cipher": null,"ssl_hash": null,"ssl_key_exchange": null,"ssl_protocol": null,"state": "running","timeout": 0,"type": "network","user": "rmq_oms","vhost": "/"}]` ) func expectSubstring(t *testing.T, body string, substr string) { @@ -75,7 +75,7 @@ func TestWholeApp(t *testing.T) { t.Errorf("Home page didn't return %v", http.StatusOK) } body := w.Body.String() - // t.Log(body) + //t.Log(body) expectSubstring(t, body, `rabbitmq_up 1`) // overview @@ -90,16 +90,16 @@ func TestWholeApp(t *testing.T) { expectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4"} 4`) // queue - expectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",vhost="/"} 25`) - expectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",vhost="vhost4"} 13912`) - expectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",vhost="/"} 6`) - expectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",vhost="/"} 6`) - expectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",vhost="/"} 0`) + expectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`) + expectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`) + expectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`) + expectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`) + expectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`) // exchange expectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`) // connection - dontExpectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 2`) - dontExpectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 45416`) + dontExpectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="1",user="rmq_oms",vhost="/"} 2`) + dontExpectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="1",user="rmq_oms",vhost="/"} 45416`) } func TestWholeAppInverted(t *testing.T) { @@ -142,7 +142,7 @@ func TestWholeAppInverted(t *testing.T) { t.Errorf("Home page didn't return %v", http.StatusOK) } body := w.Body.String() - + t.Log(body) expectSubstring(t, body, `rabbitmq_up 1`) // overview is always scraped and exported @@ -157,16 +157,16 @@ func TestWholeAppInverted(t *testing.T) { dontExpectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4"} 4`) // queue - dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",vhost="/"} 25`) - dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",vhost="vhost4"} 13912`) - dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",vhost="/"} 6`) - dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",vhost="/"} 6`) - dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",vhost="/"} 0`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`) + dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`) + dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`) // exchange dontExpectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`) // connection - expectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 2`) - expectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 45416`) + expectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="0",user="rmq_oms",vhost="/"} 1`) + expectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="0",user="rmq_oms",vhost="/"} 22708`) } func TestAppMaxQueues(t *testing.T) { @@ -225,18 +225,18 @@ func TestAppMaxQueues(t *testing.T) { expectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4"} 4`) // queue - dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",vhost="/"} 25`) - dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",vhost="vhost4"} 13912`) - dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",vhost="/"} 6`) - dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",vhost="/"} 6`) - dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",vhost="/"} 0`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`) + dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`) + dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`) // exchange expectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`) // connection - dontExpectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 2`) - dontExpectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 45416`) + dontExpectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="1",user="rmq_oms",vhost="/"} 2`) + dontExpectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="1",user="rmq_oms",vhost="/"} 45416`) } func TestRabbitError(t *testing.T) { @@ -340,18 +340,18 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) { expectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4"} 4`) // queue - expectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",vhost="/"} 25`) - expectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",vhost="vhost4"} 13912`) - expectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",vhost="/"} 6`) - expectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",vhost="/"} 6`) - expectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",vhost="/"} 0`) + expectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`) + expectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`) + expectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`) + expectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`) + expectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`) // exchange expectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`) // connection - expectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 2`) - expectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 45416`) + expectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="0",user="rmq_oms",vhost="/"} 1`) + expectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="0",user="rmq_oms",vhost="/"} 22708`) }) t.Run("Rabbit Queue Endpoint down -> 'rabbitmq_up 0' and queue metrics are missing", func(t *testing.T) { @@ -383,18 +383,18 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) { expectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4"} 4`) // queue - dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",vhost="/"}`) - dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",vhost="vhost4"}`) - dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",vhost="/"}`) - dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",vhost="/"}`) - dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`) // exchange expectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`) // connection - expectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 2`) - expectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"} 45416`) + expectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="0",user="rmq_oms",vhost="/"} 1`) + expectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="0",user="rmq_oms",vhost="/"} 22708`) }) t.Run("RabbitMQ is down -> all metrics are missing except 'rabbitmq_up 0'", func(t *testing.T) { @@ -426,18 +426,18 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) { dontExpectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4"}`) // queue - dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",vhost="/"}`) - dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",vhost="vhost4"}`) - dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",vhost="/"}`) - dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",vhost="/"}`) - dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`) // exchange dontExpectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"}`) // connection - dontExpectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"}`) - dontExpectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",user="rmq_oms",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_connection_channels{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="1",user="rmq_oms",vhost="/"}`) + dontExpectSubstring(t, body, `rabbitmq_connection_received_packets{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="1",user="rmq_oms",vhost="/"}`) }) } @@ -486,10 +486,12 @@ func TestQueueState(t *testing.T) { expectSubstring(t, body, `rabbitmq_up 1`) // queue - expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="",queue="myQueue1",state="flow",vhost="/"} 1`) - expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="",queue="myQueue3",state="running",vhost="/"} 1`) - expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="ha-2",queue="myQueue2",state="running",vhost="/"} 1`) + expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="",queue="myQueue1",self="1",state="flow",vhost="/"} 1`) + expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="",queue="myQueue3",self="1",state="running",vhost="/"} 1`) + expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="ha-2",queue="myQueue2",self="1",state="running",vhost="/"} 1`) // connections - expectSubstring(t, body, `rabbitmq_connection_status{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",state="running",user="rmq_oms",vhost="/"} 2`) + expectSubstring(t, body, `rabbitmq_connection_status{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="0",state="running",user="rmq_oms",vhost="/"} 1`) + expectSubstring(t, body, `rabbitmq_connection_status{node="my-rabbit@ae74c041248b",peer_host="172.31.0.130",self="1",state="running",user="rmq_oms",vhost="/"} 1`) + } diff --git a/integration_test.go b/integration_test.go index b59cf1f..f8743e3 100644 --- a/integration_test.go +++ b/integration_test.go @@ -68,7 +68,7 @@ func TestQueueCount(t *testing.T) { // log.Println(testenv.GetOrDie(env.ManagementURL()+"/api/queues", 5*time.Second)) body := testenv.GetOrDie(exporterURL, 5*time.Second) - search := fmt.Sprintf(`rabbitmq_queue_head_message_timestamp{durable="true",policy="",queue="%s",vhost="/"} %1.9e`, queue, float64(timestamp.Unix())) + search := fmt.Sprintf(`rabbitmq_queue_head_message_timestamp{durable="true",policy="",queue="%s",self="1",vhost="/"} %1.9e`, queue, float64(timestamp.Unix())) i := strings.Index(body, search) if i == -1 { @@ -84,7 +84,7 @@ func TestQueueCount(t *testing.T) { time.Sleep(5 * time.Second) // give rabbitmq management plugin a bit of time body := testenv.GetOrDie(exporterURL, 5*time.Second) - search := fmt.Sprintf(`rabbitmq_queue_messages{durable="true",policy="",queue="%s",vhost="/"} 0`, queue) + search := fmt.Sprintf(`rabbitmq_queue_messages{durable="true",policy="",queue="%s",self="1",vhost="/"} 0`, queue) i := strings.Index(body, search) if i == -1 { @@ -101,7 +101,7 @@ func TestQueueCount(t *testing.T) { body := testenv.GetOrDie(exporterURL, 5*time.Second) - search := fmt.Sprintf(`rabbitmq_queue_messages{durable="false",policy="",queue="%s",vhost="/"} 0`, queue) + search := fmt.Sprintf(`rabbitmq_queue_messages{durable="false",policy="",queue="%s",self="1",vhost="/"} 0`, queue) i := strings.Index(body, search) if i == -1 { @@ -120,7 +120,7 @@ func TestQueueCount(t *testing.T) { time.Sleep(10 * time.Second) // give rabbitmq management plugin a bit of time body := testenv.GetOrDie(exporterURL, 5*time.Second) - search := fmt.Sprintf(`rabbitmq_queue_messages{durable="false",policy="%s",queue="%s",vhost="/"} 0`, policy, queue) + search := fmt.Sprintf(`rabbitmq_queue_messages{durable="false",policy="%s",queue="%s",self="1",vhost="/"} 0`, policy, queue) i := strings.Index(body, search) if i == -1 { // t.Log(env.ManagementURL()) diff --git a/jsonmap.go b/jsonmap.go index 852a6ac..aa4e8d6 100644 --- a/jsonmap.go +++ b/jsonmap.go @@ -9,25 +9,25 @@ import ( ) type rabbitJSONReply struct { - decoder *json.Decoder + body []byte + keys map[string]interface{} } -func makeJSONReply(body []byte) RabbitReply { - decoder := json.NewDecoder(bytes.NewBuffer(body)) - return &rabbitJSONReply{decoder} +func makeJSONReply(body []byte) (RabbitReply, error) { + return &rabbitJSONReply{body, nil}, nil } //MakeStatsInfo creates a slice of StatsInfo from json input. Only keys with float values are mapped into `metrics`. func (rep *rabbitJSONReply) MakeStatsInfo(labels []string) []StatsInfo { var statistics []StatsInfo var jsonArr []map[string]interface{} - - if rep.decoder == nil { + decoder := json.NewDecoder(bytes.NewBuffer(rep.body)) + if decoder == nil { log.Error("JSON decoder not iniatilized") return make([]StatsInfo, 0) } - if err := rep.decoder.Decode(&jsonArr); err != nil { + if err := decoder.Decode(&jsonArr); err != nil { log.WithField("error", err).Error("Error while decoding json") return make([]StatsInfo, 0) } @@ -61,13 +61,13 @@ func (rep *rabbitJSONReply) MakeStatsInfo(labels []string) []StatsInfo { func (rep *rabbitJSONReply) MakeMap() MetricMap { flMap := make(MetricMap) var output map[string]interface{} - - if rep.decoder == nil { + decoder := json.NewDecoder(bytes.NewBuffer(rep.body)) + if decoder == nil { log.Error("JSON decoder not iniatilized") return flMap } - if err := rep.decoder.Decode(&output); err != nil { + if err := decoder.Decode(&output); err != nil { log.WithField("error", err).Error("Error while decoding json") return flMap } @@ -99,3 +99,25 @@ func addFields(toMap *MetricMap, basename string, source map[string]interface{}) } } } + +func (rep *rabbitJSONReply) GetString(key string) (string, bool) { + if rep.keys == nil { + keys := make(map[string]interface{}) + decoder := json.NewDecoder(bytes.NewBuffer(rep.body)) + if decoder == nil { + log.Error("JSON decoder not iniatilized") + return "", false + } + err := decoder.Decode(&keys) + if err != nil { + return "", false + } + rep.keys = keys + } + val, ok := rep.keys[key] + if !ok { + return "", false + } + value, ok := val.(string) + return value, ok +} diff --git a/jsonmap_test.go b/jsonmap_test.go index f7a6b0f..b94a491 100644 --- a/jsonmap_test.go +++ b/jsonmap_test.go @@ -5,7 +5,7 @@ import ( ) func TestWithInvalidJSON(t *testing.T) { - invalidJSONReply := makeJSONReply([]byte("I'm no json")) + invalidJSONReply, _ := makeJSONReply([]byte("I'm no json")) if mm := invalidJSONReply.MakeMap(); mm == nil { t.Errorf("Json is invalid. Empty map should be returned. Value: %v", mm) @@ -34,14 +34,14 @@ func checkMap(flMap map[string]float64, t *testing.T, addValue float64) { } func TestMakeMap(t *testing.T) { - reply := makeJSONReply([]byte(`{"FloatKey":4, "st":"string","nes":{"ted":5}}`)) + reply, _ := makeJSONReply([]byte(`{"FloatKey":4, "st":"string","nes":{"ted":5}}`)) flMap := reply.MakeMap() checkMap(flMap, t, 0) } func TestMakeStatsInfo(t *testing.T) { - reply := makeJSONReply([]byte(`[{"name":"q1", "FloatKey":14,"nes":{"ted":15}},{"name":"q2", "vhost":"foo", "FloatKey":24,"nes":{"ted":25}}]`)) + reply, _ := makeJSONReply([]byte(`[{"name":"q1", "FloatKey":14,"nes":{"ted":15}},{"name":"q2", "vhost":"foo", "FloatKey":24,"nes":{"ted":25}}]`)) qinfo := reply.MakeStatsInfo(queueLabelKeys) t.Log(qinfo) @@ -59,7 +59,7 @@ func TestMakeStatsInfo(t *testing.T) { } func TestArraySize(t *testing.T) { - reply := makeJSONReply([]byte(`{"node":"node1","empty_partitions": [],"partitions": [{"name":"node1"},{"name":"node2"}]}`)) + reply, _ := makeJSONReply([]byte(`{"node":"node1","empty_partitions": [],"partitions": [{"name":"node1"},{"name":"node2"}]}`)) node := reply.MakeMap() t.Log(node)