Skip to content

Commit

Permalink
added queue length metrics. Still problems with bert
Browse files Browse the repository at this point in the history
  • Loading branch information
kbudde committed Feb 28, 2019
1 parent fc15966 commit 1f7ef7a
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 63 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ metric | description
|queue_memory|Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.|
|queue_head_message_timestamp|The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.|
|queue_status|A metric with a value of constant '1' if the queue is in a certain state. Labels: vhost, queue, *state* (running, flow,..)|
|queue_max_length_bytes|Total body size for ready messages a queue can contain before it starts to drop them from its head.|
|queue_max_length|How many (ready) messages a queue can contain before it starts to drop them from its head.|


#### Queues - Counter

Expand Down
4 changes: 4 additions & 0 deletions bertmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func TestStatsEquivalence(t *testing.T) {
}
}

func TestNewFile(t *testing.T) {
assertBertStatsEquivalence(t, "queue-max-length", nodeLabelKeys)
}

func TestMetricMapEquivalence(t *testing.T) {
endpoints := []string{"overview"}
versions := []string{"3.6.8", "3.7.0"}
Expand Down
2 changes: 2 additions & 0 deletions exporter_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ var (
"consumer_utilisation": newGaugeVec("queue_consumer_utilisation", "Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.", queueLabels),
"memory": newGaugeVec("queue_memory", "Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.", queueLabels),
"head_message_timestamp": newGaugeVec("queue_head_message_timestamp", "The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.", queueLabels), //https://github.com/rabbitmq/rabbitmq-server/pull/54
"arguments.x-max-length-bytes": newGaugeVec("queue_max_length_bytes", "Total body size for ready messages a queue can contain before it starts to drop them from its head.", queueLabels),
"arguments.x-max-length": newGaugeVec("queue_max_length", "How many (ready) messages a queue can contain before it starts to drop them from its head.", queueLabels),
}

queueCounterVec = map[string]*prometheus.Desc{
Expand Down
117 changes: 54 additions & 63 deletions exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -33,27 +34,33 @@ func dontExpectSubstring(t *testing.T, body string, substr string) {
}
}

func TestWholeApp(t *testing.T) {
func setupServer(t *testing.T, overview, queues, exchange, nodes, connections string) *httptest.Server {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
if r.RequestURI == "/api/overview" {
fmt.Fprintln(w, overviewTestData)
fmt.Fprintln(w, overview)
} else if r.RequestURI == "/api/queues" {
fmt.Fprintln(w, queuesTestData)
fmt.Fprintln(w, queues)
} else if r.RequestURI == "/api/exchanges" {
fmt.Fprintln(w, exchangeAPIResponse)
fmt.Fprintln(w, exchange)
} else if r.RequestURI == "/api/nodes" {
fmt.Fprintln(w, nodesAPIResponse)
fmt.Fprintln(w, nodes)
} else if r.RequestURI == "/api/connections" {
fmt.Fprintln(w, connectionAPIResponse)
fmt.Fprintln(w, connections)
} else {
t.Errorf("Invalid request. URI=%v", r.RequestURI)
fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI)
}

}))
return server
}

func TestWholeApp(t *testing.T) {
server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse)
defer server.Close()

os.Setenv("RABBIT_URL", server.URL)
defer os.Unsetenv("RABBIT_URL")
os.Setenv("SKIP_QUEUES", "^.*3$")
Expand Down Expand Up @@ -103,26 +110,9 @@ func TestWholeApp(t *testing.T) {
}

func TestWholeAppInverted(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
if r.RequestURI == "/api/overview" {
fmt.Fprintln(w, overviewTestData)
} else if r.RequestURI == "/api/queues" {
fmt.Fprintln(w, queuesTestData)
} else if r.RequestURI == "/api/exchanges" {
fmt.Fprintln(w, exchangeAPIResponse)
} else if r.RequestURI == "/api/nodes" {
fmt.Fprintln(w, nodesAPIResponse)
} else if r.RequestURI == "/api/connections" {
fmt.Fprintln(w, connectionAPIResponse)
} else {
t.Errorf("Invalid request. URI=%v", r.RequestURI)
fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI)
}

}))
server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse)
defer server.Close()

os.Setenv("RABBIT_URL", server.URL)
os.Setenv("SKIP_QUEUES", "^.*3$")
defer os.Unsetenv("SKIP_QUEUES")
Expand Down Expand Up @@ -170,26 +160,9 @@ func TestWholeAppInverted(t *testing.T) {
}

func TestAppMaxQueues(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
if r.RequestURI == "/api/overview" {
fmt.Fprintln(w, overviewTestData)
} else if r.RequestURI == "/api/queues" {
fmt.Fprintln(w, queuesTestData)
} else if r.RequestURI == "/api/exchanges" {
fmt.Fprintln(w, exchangeAPIResponse)
} else if r.RequestURI == "/api/nodes" {
fmt.Fprintln(w, nodesAPIResponse)
} else if r.RequestURI == "/api/connections" {
fmt.Fprintln(w, connectionAPIResponse)
} else {
t.Errorf("Invalid request. URI=%v", r.RequestURI)
fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI)
}

}))
server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse)
defer server.Close()

os.Setenv("RABBIT_URL", server.URL)
os.Setenv("SKIP_QUEUES", "^.*3$")
defer os.Unsetenv("SKIP_QUEUES")
Expand Down Expand Up @@ -443,26 +416,9 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) {
}

func TestQueueState(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
if r.RequestURI == "/api/overview" {
fmt.Fprintln(w, overviewTestData)
} else if r.RequestURI == "/api/queues" {
fmt.Fprintln(w, queuesTestData)
} else if r.RequestURI == "/api/exchanges" {
fmt.Fprintln(w, exchangeAPIResponse)
} else if r.RequestURI == "/api/nodes" {
fmt.Fprintln(w, nodesAPIResponse)
} else if r.RequestURI == "/api/connections" {
fmt.Fprintln(w, connectionAPIResponse)
} else {
t.Errorf("Invalid request. URI=%v", r.RequestURI)
fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI)
}

}))
server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse)
defer server.Close()

os.Setenv("RABBIT_URL", server.URL)
os.Setenv("RABBIT_CAPABILITIES", " ")
defer os.Unsetenv("RABBIT_CAPABILITIES")
Expand Down Expand Up @@ -495,3 +451,38 @@ func TestQueueState(t *testing.T) {
expectSubstring(t, body, `rabbitmq_connection_status{node="my-rabbit@ae74c041248b",peer_host="172.31.0.130",self="1",state="running",user="rmq_oms",vhost="/"} 1`)

}

func TestQueueLength(t *testing.T) {
testdataFile := "testdata/queue-max-length.json"
queuedata, err := ioutil.ReadFile(testdataFile)
if err != nil {
t.Fatalf("Error reading %s", testdataFile)
}
server := setupServer(t, "", string(queuedata), "", "", "")
defer server.Close()

os.Setenv("RABBIT_URL", server.URL)
os.Setenv("RABBIT_CAPABILITIES", " ")
defer os.Unsetenv("RABBIT_CAPABILITIES")
os.Setenv("RABBIT_EXPORTERS", "queue")
defer os.Unsetenv("RABBIT_EXPORTERS")
initConfig()

exporter := newExporter()
prometheus.MustRegister(exporter)
defer prometheus.Unregister(exporter)

req, _ := http.NewRequest("GET", "", nil)
w := httptest.NewRecorder()
prometheus.Handler().ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Errorf("Home page didn't return %v", http.StatusOK)
}
body := w.Body.String()
t.Log(body)

// queue
expectSubstring(t, body, `rabbitmq_queue_max_length{durable="true",policy="",queue="QueueWithMaxLength55",self="0",vhost="/"} 55`)
expectSubstring(t, body, `rabbitmq_queue_max_length_bytes{durable="true",policy="",queue="QueueWithMaxBytes99",self="0",vhost="/"} 99`)

}
Binary file added testdata/queue-max-length.bert
Binary file not shown.
156 changes: 156 additions & 0 deletions testdata/queue-max-length.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
[
{
"arguments": {
"x-max-length-bytes": 99
},
"auto_delete": false,
"backing_queue_status": {
"avg_ack_egress_rate": 0,
"avg_ack_ingress_rate": 0,
"avg_egress_rate": 0,
"avg_ingress_rate": 0,
"delta": [
"delta",
"undefined",
0,
0,
"undefined"
],
"len": 0,
"mode": "default",
"next_seq_id": 0,
"q1": 0,
"q2": 0,
"q3": 0,
"q4": 0,
"target_ram_count": "infinity"
},
"consumer_utilisation": null,
"consumers": 0,
"durable": true,
"effective_policy_definition": [],
"exclusive": false,
"exclusive_consumer_tag": null,
"garbage_collection": {
"fullsweep_after": 65535,
"max_heap_size": 0,
"min_bin_vheap_size": 46422,
"min_heap_size": 233,
"minor_gcs": 0
},
"head_message_timestamp": null,
"idle_since": "2019-02-28 19:57:53",
"memory": 13872,
"message_bytes": 0,
"message_bytes_paged_out": 0,
"message_bytes_persistent": 0,
"message_bytes_ram": 0,
"message_bytes_ready": 0,
"message_bytes_unacknowledged": 0,
"messages": 0,
"messages_details": {
"rate": 0
},
"messages_paged_out": 0,
"messages_persistent": 0,
"messages_ram": 0,
"messages_ready": 0,
"messages_ready_details": {
"rate": 0
},
"messages_ready_ram": 0,
"messages_unacknowledged": 0,
"messages_unacknowledged_details": {
"rate": 0
},
"messages_unacknowledged_ram": 0,
"name": "QueueWithMaxBytes99",
"node": "rabbit@rabbitmq1",
"operator_policy": null,
"policy": null,
"recoverable_slaves": null,
"reductions": 3861,
"reductions_details": {
"rate": 0
},
"state": "running",
"vhost": "/"
},
{
"arguments": {
"x-max-length": 55
},
"auto_delete": false,
"backing_queue_status": {
"avg_ack_egress_rate": 0,
"avg_ack_ingress_rate": 0,
"avg_egress_rate": 0,
"avg_ingress_rate": 0,
"delta": [
"delta",
"undefined",
0,
0,
"undefined"
],
"len": 0,
"mode": "default",
"next_seq_id": 0,
"q1": 0,
"q2": 0,
"q3": 0,
"q4": 0,
"target_ram_count": "infinity"
},
"consumer_utilisation": null,
"consumers": 0,
"durable": true,
"effective_policy_definition": [],
"exclusive": false,
"exclusive_consumer_tag": null,
"garbage_collection": {
"fullsweep_after": 65535,
"max_heap_size": 0,
"min_bin_vheap_size": 46422,
"min_heap_size": 233,
"minor_gcs": 0
},
"head_message_timestamp": null,
"idle_since": "2019-02-28 19:56:48",
"memory": 13864,
"message_bytes": 0,
"message_bytes_paged_out": 0,
"message_bytes_persistent": 0,
"message_bytes_ram": 0,
"message_bytes_ready": 0,
"message_bytes_unacknowledged": 0,
"messages": 0,
"messages_details": {
"rate": 0
},
"messages_paged_out": 0,
"messages_persistent": 0,
"messages_ram": 0,
"messages_ready": 0,
"messages_ready_details": {
"rate": 0
},
"messages_ready_ram": 0,
"messages_unacknowledged": 0,
"messages_unacknowledged_details": {
"rate": 0
},
"messages_unacknowledged_ram": 0,
"name": "QueueWithMaxLength55",
"node": "rabbit@rabbitmq1",
"operator_policy": null,
"policy": null,
"recoverable_slaves": null,
"reductions": 3873,
"reductions_details": {
"rate": 0
},
"state": "running",
"vhost": "/"
}
]

0 comments on commit 1f7ef7a

Please sign in to comment.