Skip to content

Commit

Permalink
added cluster label to queues
Browse files Browse the repository at this point in the history
  • Loading branch information
kbudde committed Mar 11, 2019
1 parent 9846ad5 commit 4bceb92
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 49 deletions.
2 changes: 2 additions & 0 deletions exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
endpointScrapeDuration contextValues = "endpointScrapeDuration"
endpointUpMetric contextValues = "endpointUpMetric"
nodeName contextValues = "node"
clusterName contextValues = "cluster"
)

//RegisterExporter makes an exporter available by the provided name.
Expand Down Expand Up @@ -94,6 +95,7 @@ func (e *exporter) Collect(ch chan<- prometheus.Metric) {
}

ctx = context.WithValue(ctx, nodeName, e.overviewExporter.NodeInfo().Node)
ctx = context.WithValue(ctx, clusterName, e.overviewExporter.NodeInfo().ClusterName)

for name, ex := range e.exporter {

Expand Down
14 changes: 9 additions & 5 deletions exporter_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func init() {
}

var (
queueLabels = []string{"vhost", "queue", "durable", "policy", "self"}
queueLabels = []string{"cluster", "vhost", "queue", "durable", "policy", "self"}
queueLabelKeys = []string{"vhost", "name", "durable", "policy", "state", "node"}

queueGaugeVec = map[string]*prometheus.GaugeVec{
Expand Down Expand Up @@ -113,6 +113,10 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
if n, ok := ctx.Value(nodeName).(string); ok {
selfNode = n
}
cluster := ""
if n, ok := ctx.Value(clusterName).(string); ok {
cluster = n
}

rabbitMqQueueData, err := getStatsInfo(config, "queues", queueLabelKeys)

Expand All @@ -136,7 +140,7 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
self = "1"
}
// log.WithFields(log.Fields{"vhost": queue.labels["vhost"], "queue": queue.labels["name"], "key": key, "value": value}).Info("Set queue metric for key")
gaugevec.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self).Set(value)
gaugevec.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self).Set(value)
}
}
}
Expand All @@ -158,7 +162,7 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
if queue.labels["node"] == selfNode {
self = "1"
}
e.stateMetric.WithLabelValues(queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self, queue.labels["state"]).Set(1)
e.stateMetric.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self, queue.labels["state"]).Set(1)
}
}
}
Expand All @@ -180,9 +184,9 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric)
self = "1"
}
if value, ok := queue.metrics[key]; ok {
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self)
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self)
} else {
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self)
ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self)
}
}
}
Expand Down
72 changes: 36 additions & 36 deletions exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func TestWholeApp(t *testing.T) {
expectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4",self="0"} 4`)

// queue
expectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`)
expectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`)
expectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
expectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
expectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`)
expectSubstring(t, body, `rabbitmq_queue_messages_ready{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`)
expectSubstring(t, body, `rabbitmq_queue_memory{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`)
expectSubstring(t, body, `rabbitmq_queue_messages_published_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
expectSubstring(t, body, `rabbitmq_queue_disk_writes_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
expectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`)
// exchange
expectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`)
// connection
Expand Down Expand Up @@ -149,11 +149,11 @@ func TestWholeAppInverted(t *testing.T) {
dontExpectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4"} 4`)

// queue
dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`)
dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`)
dontExpectSubstring(t, body, `rabbitmq_queue_memory{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`)
// exchange
dontExpectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`)
// connection
Expand Down Expand Up @@ -200,11 +200,11 @@ func TestAppMaxQueues(t *testing.T) {
expectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4",self="0"} 4`)

// queue
dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`)
dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`)
dontExpectSubstring(t, body, `rabbitmq_queue_memory{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`)

// exchange
expectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`)
Expand Down Expand Up @@ -315,11 +315,11 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) {
expectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4",self="0"} 4`)

// queue
expectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`)
expectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`)
expectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
expectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
expectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`)
expectSubstring(t, body, `rabbitmq_queue_messages_ready{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`)
expectSubstring(t, body, `rabbitmq_queue_memory{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"} 13912`)
expectSubstring(t, body, `rabbitmq_queue_messages_published_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
expectSubstring(t, body, `rabbitmq_queue_disk_writes_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 6`)
expectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"} 0`)

// exchange
expectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`)
Expand Down Expand Up @@ -358,11 +358,11 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) {
expectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4",self="0"} 4`)

// queue
dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_memory{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)

// exchange
expectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"} 5`)
Expand Down Expand Up @@ -401,11 +401,11 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) {
dontExpectSubstring(t, body, `rabbitmq_partitions{node="my-rabbit@5a00cd8fe2f4",self="0"} 4`)

// queue
dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_memory{durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_ready{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_memory{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue4",self="1",vhost="vhost4"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_published_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_disk_writes_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)
dontExpectSubstring(t, body, `rabbitmq_queue_messages_delivered_total{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",vhost="/"}`)

// exchange
dontExpectSubstring(t, body, `rabbitmq_exchange_messages_published_in_total{exchange="myExchange",vhost="/"}`)
Expand Down Expand Up @@ -444,9 +444,9 @@ func TestQueueState(t *testing.T) {
expectSubstring(t, body, `rabbitmq_up 1`)

// queue
expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="",queue="myQueue1",self="1",state="flow",vhost="/"} 1`)
expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="",queue="myQueue3",self="1",state="running",vhost="/"} 1`)
expectSubstring(t, body, `rabbitmq_queue_state{durable="true",policy="ha-2",queue="myQueue2",self="1",state="running",vhost="/"} 1`)
expectSubstring(t, body, `rabbitmq_queue_state{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue1",self="1",state="flow",vhost="/"} 1`)
expectSubstring(t, body, `rabbitmq_queue_state{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="myQueue3",self="1",state="running",vhost="/"} 1`)
expectSubstring(t, body, `rabbitmq_queue_state{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="1",state="running",vhost="/"} 1`)

// connections
expectSubstring(t, body, `rabbitmq_connection_status{node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="0",state="running",user="rmq_oms",vhost="/"} 1`)
Expand All @@ -460,7 +460,7 @@ func TestQueueLength(t *testing.T) {
if err != nil {
t.Fatalf("Error reading %s", testdataFile)
}
server := setupServer(t, "", string(queuedata), "", "", "")
server := setupServer(t, `{"node": "rabbit@rabbitmq1","cluster_name": "my-rabbit@ae74c041248b"}`, string(queuedata), "", "", "")
defer server.Close()

os.Setenv("RABBIT_URL", server.URL)
Expand All @@ -484,7 +484,7 @@ func TestQueueLength(t *testing.T) {
t.Log(body)

// queue
expectSubstring(t, body, `rabbitmq_queue_max_length{durable="true",policy="",queue="QueueWithMaxLength55",self="0",vhost="/"} 55`)
expectSubstring(t, body, `rabbitmq_queue_max_length_bytes{durable="true",policy="",queue="QueueWithMaxBytes99",self="0",vhost="/"} 99`)
expectSubstring(t, body, `rabbitmq_queue_max_length{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="QueueWithMaxLength55",self="1",vhost="/"} 55`)
expectSubstring(t, body, `rabbitmq_queue_max_length_bytes{cluster="my-rabbit@ae74c041248b",durable="true",policy="",queue="QueueWithMaxBytes99",self="1",vhost="/"} 99`)

}
12 changes: 6 additions & 6 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestQueueCount(t *testing.T) {
t.Run("Ensure there are no queues", func(t *testing.T) {
body := testenv.GetOrDie(exporterURL, 5*time.Second)

r := regexp.MustCompile("rabbitmq_queues.* 0")
r := regexp.MustCompile(`rabbitmq_queues{cluster="rabbit@localtest"} 0`)
if s := r.FindString(body); s == "" {
t.Fatalf("QueueCount 0 not found in body: %v", body)
}
Expand All @@ -52,7 +52,7 @@ func TestQueueCount(t *testing.T) {

body := testenv.GetOrDie(exporterURL, 5*time.Second)

r := regexp.MustCompile("rabbitmq_queues.* 1")
r := regexp.MustCompile(`rabbitmq_queues{cluster="rabbit@localtest"} 1`)
if s := r.FindString(body); s == "" {
// t.Logf("body: %s", body)
t.Fatalf("QueueCount 1 not found ")
Expand All @@ -68,7 +68,7 @@ func TestQueueCount(t *testing.T) {
// log.Println(testenv.GetOrDie(env.ManagementURL()+"/api/queues", 5*time.Second))
body := testenv.GetOrDie(exporterURL, 5*time.Second)

search := fmt.Sprintf(`rabbitmq_queue_head_message_timestamp{durable="true",policy="",queue="%s",self="1",vhost="/"} %1.9e`, queue, float64(timestamp.Unix()))
search := fmt.Sprintf(`rabbitmq_queue_head_message_timestamp{cluster="rabbit@localtest",durable="true",policy="",queue="%s",self="1",vhost="/"} %1.9e`, queue, float64(timestamp.Unix()))
i := strings.Index(body, search)

if i == -1 {
Expand All @@ -84,7 +84,7 @@ func TestQueueCount(t *testing.T) {
time.Sleep(5 * time.Second) // give rabbitmq management plugin a bit of time
body := testenv.GetOrDie(exporterURL, 5*time.Second)

search := fmt.Sprintf(`rabbitmq_queue_messages{durable="true",policy="",queue="%s",self="1",vhost="/"} 0`, queue)
search := fmt.Sprintf(`rabbitmq_queue_messages{cluster="rabbit@localtest",durable="true",policy="",queue="%s",self="1",vhost="/"} 0`, queue)
i := strings.Index(body, search)

if i == -1 {
Expand All @@ -101,7 +101,7 @@ func TestQueueCount(t *testing.T) {

body := testenv.GetOrDie(exporterURL, 5*time.Second)

search := fmt.Sprintf(`rabbitmq_queue_messages{durable="false",policy="",queue="%s",self="1",vhost="/"} 0`, queue)
search := fmt.Sprintf(`rabbitmq_queue_messages{cluster="rabbit@localtest",durable="false",policy="",queue="%s",self="1",vhost="/"} 0`, queue)
i := strings.Index(body, search)

if i == -1 {
Expand All @@ -120,7 +120,7 @@ func TestQueueCount(t *testing.T) {
time.Sleep(10 * time.Second) // give rabbitmq management plugin a bit of time
body := testenv.GetOrDie(exporterURL, 5*time.Second)

search := fmt.Sprintf(`rabbitmq_queue_messages{durable="false",policy="%s",queue="%s",self="1",vhost="/"} 0`, policy, queue)
search := fmt.Sprintf(`rabbitmq_queue_messages{cluster="rabbit@localtest",durable="false",policy="%s",queue="%s",self="1",vhost="/"} 0`, policy, queue)
i := strings.Index(body, search)
if i == -1 {
// t.Log(env.ManagementURL())
Expand Down
Loading

0 comments on commit 4bceb92

Please sign in to comment.