From 1f7ef7a9ac316caf7283dab8494314853a5ee55f Mon Sep 17 00:00:00 2001 From: Kris Budde Date: Thu, 28 Feb 2019 22:31:08 +0100 Subject: [PATCH] added queue length metrics. Still problems with bert --- README.md | 3 + bertmap_test.go | 4 + exporter_queue.go | 2 + exporter_test.go | 117 ++++++++++++------------- testdata/queue-max-length.bert | Bin 0 -> 2974 bytes testdata/queue-max-length.json | 156 +++++++++++++++++++++++++++++++++ 6 files changed, 219 insertions(+), 63 deletions(-) create mode 100644 testdata/queue-max-length.bert create mode 100644 testdata/queue-max-length.json diff --git a/README.md b/README.md index 77dae22..5b9e9ac 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,9 @@ metric | description |queue_memory|Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.| |queue_head_message_timestamp|The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.| |queue_status|A metric with a value of constant '1' if the queue is in a certain state. Labels: vhost, queue, *state* (running, flow,..)| +|queue_max_length_bytes|Total body size for ready messages a queue can contain before it starts to drop them from its head.| +|queue_max_length|How many (ready) messages a queue can contain before it starts to drop them from its head.| + #### Queues - Counter diff --git a/bertmap_test.go b/bertmap_test.go index 103faed..5fa7598 100644 --- a/bertmap_test.go +++ b/bertmap_test.go @@ -23,6 +23,10 @@ func TestStatsEquivalence(t *testing.T) { } } +func TestNewFile(t *testing.T) { + assertBertStatsEquivalence(t, "queue-max-length", nodeLabelKeys) +} + func TestMetricMapEquivalence(t *testing.T) { endpoints := []string{"overview"} versions := []string{"3.6.8", "3.7.0"} diff --git a/exporter_queue.go b/exporter_queue.go index 6b94b17..ae907f7 100644 --- a/exporter_queue.go +++ b/exporter_queue.go @@ -33,6 +33,8 @@ var ( "consumer_utilisation": newGaugeVec("queue_consumer_utilisation", "Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.", queueLabels), "memory": newGaugeVec("queue_memory", "Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.", queueLabels), "head_message_timestamp": newGaugeVec("queue_head_message_timestamp", "The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.", queueLabels), //https://github.com/rabbitmq/rabbitmq-server/pull/54 + "arguments.x-max-length-bytes": newGaugeVec("queue_max_length_bytes", "Total body size for ready messages a queue can contain before it starts to drop them from its head.", queueLabels), + "arguments.x-max-length": newGaugeVec("queue_max_length", "How many (ready) messages a queue can contain before it starts to drop them from its head.", queueLabels), } queueCounterVec = map[string]*prometheus.Desc{ diff --git a/exporter_test.go b/exporter_test.go index 85e80bc..042dd27 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "io/ioutil" "net/http" "net/http/httptest" "os" @@ -33,27 +34,33 @@ func dontExpectSubstring(t *testing.T, body string, substr string) { } } -func TestWholeApp(t *testing.T) { +func setupServer(t *testing.T, overview, queues, exchange, nodes, connections string) *httptest.Server { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") if r.RequestURI == "/api/overview" { - fmt.Fprintln(w, overviewTestData) + fmt.Fprintln(w, overview) } else if r.RequestURI == "/api/queues" { - fmt.Fprintln(w, queuesTestData) + fmt.Fprintln(w, queues) } else if r.RequestURI == "/api/exchanges" { - fmt.Fprintln(w, exchangeAPIResponse) + fmt.Fprintln(w, exchange) } else if r.RequestURI == "/api/nodes" { - fmt.Fprintln(w, nodesAPIResponse) + fmt.Fprintln(w, nodes) } else if r.RequestURI == "/api/connections" { - fmt.Fprintln(w, connectionAPIResponse) + fmt.Fprintln(w, connections) } else { t.Errorf("Invalid request. URI=%v", r.RequestURI) fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI) } })) + return server +} + +func TestWholeApp(t *testing.T) { + server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse) defer server.Close() + os.Setenv("RABBIT_URL", server.URL) defer os.Unsetenv("RABBIT_URL") os.Setenv("SKIP_QUEUES", "^.*3$") @@ -103,26 +110,9 @@ func TestWholeApp(t *testing.T) { } func TestWholeAppInverted(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - if r.RequestURI == "/api/overview" { - fmt.Fprintln(w, overviewTestData) - } else if r.RequestURI == "/api/queues" { - fmt.Fprintln(w, queuesTestData) - } else if r.RequestURI == "/api/exchanges" { - fmt.Fprintln(w, exchangeAPIResponse) - } else if r.RequestURI == "/api/nodes" { - fmt.Fprintln(w, nodesAPIResponse) - } else if r.RequestURI == "/api/connections" { - fmt.Fprintln(w, connectionAPIResponse) - } else { - t.Errorf("Invalid request. URI=%v", r.RequestURI) - fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI) - } - - })) + server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse) defer server.Close() + os.Setenv("RABBIT_URL", server.URL) os.Setenv("SKIP_QUEUES", "^.*3$") defer os.Unsetenv("SKIP_QUEUES") @@ -170,26 +160,9 @@ func TestWholeAppInverted(t *testing.T) { } func TestAppMaxQueues(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - if r.RequestURI == "/api/overview" { - fmt.Fprintln(w, overviewTestData) - } else if r.RequestURI == "/api/queues" { - fmt.Fprintln(w, queuesTestData) - } else if r.RequestURI == "/api/exchanges" { - fmt.Fprintln(w, exchangeAPIResponse) - } else if r.RequestURI == "/api/nodes" { - fmt.Fprintln(w, nodesAPIResponse) - } else if r.RequestURI == "/api/connections" { - fmt.Fprintln(w, connectionAPIResponse) - } else { - t.Errorf("Invalid request. URI=%v", r.RequestURI) - fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI) - } - - })) + server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse) defer server.Close() + os.Setenv("RABBIT_URL", server.URL) os.Setenv("SKIP_QUEUES", "^.*3$") defer os.Unsetenv("SKIP_QUEUES") @@ -443,26 +416,9 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) { } func TestQueueState(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - if r.RequestURI == "/api/overview" { - fmt.Fprintln(w, overviewTestData) - } else if r.RequestURI == "/api/queues" { - fmt.Fprintln(w, queuesTestData) - } else if r.RequestURI == "/api/exchanges" { - fmt.Fprintln(w, exchangeAPIResponse) - } else if r.RequestURI == "/api/nodes" { - fmt.Fprintln(w, nodesAPIResponse) - } else if r.RequestURI == "/api/connections" { - fmt.Fprintln(w, connectionAPIResponse) - } else { - t.Errorf("Invalid request. URI=%v", r.RequestURI) - fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI) - } - - })) + server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse) defer server.Close() + os.Setenv("RABBIT_URL", server.URL) os.Setenv("RABBIT_CAPABILITIES", " ") defer os.Unsetenv("RABBIT_CAPABILITIES") @@ -495,3 +451,38 @@ func TestQueueState(t *testing.T) { expectSubstring(t, body, `rabbitmq_connection_status{node="my-rabbit@ae74c041248b",peer_host="172.31.0.130",self="1",state="running",user="rmq_oms",vhost="/"} 1`) } + +func TestQueueLength(t *testing.T) { + testdataFile := "testdata/queue-max-length.json" + queuedata, err := ioutil.ReadFile(testdataFile) + if err != nil { + t.Fatalf("Error reading %s", testdataFile) + } + server := setupServer(t, "", string(queuedata), "", "", "") + defer server.Close() + + os.Setenv("RABBIT_URL", server.URL) + os.Setenv("RABBIT_CAPABILITIES", " ") + defer os.Unsetenv("RABBIT_CAPABILITIES") + os.Setenv("RABBIT_EXPORTERS", "queue") + defer os.Unsetenv("RABBIT_EXPORTERS") + initConfig() + + exporter := newExporter() + prometheus.MustRegister(exporter) + defer prometheus.Unregister(exporter) + + req, _ := http.NewRequest("GET", "", nil) + w := httptest.NewRecorder() + prometheus.Handler().ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("Home page didn't return %v", http.StatusOK) + } + body := w.Body.String() + t.Log(body) + + // queue + expectSubstring(t, body, `rabbitmq_queue_max_length{durable="true",policy="",queue="QueueWithMaxLength55",self="0",vhost="/"} 55`) + expectSubstring(t, body, `rabbitmq_queue_max_length_bytes{durable="true",policy="",queue="QueueWithMaxBytes99",self="0",vhost="/"} 99`) + +} diff --git a/testdata/queue-max-length.bert b/testdata/queue-max-length.bert new file mode 100644 index 0000000000000000000000000000000000000000..7fd4fef422a25d29fd7bc059b6dcce93392bfa10 GIT binary patch literal 2974 zcmeHJOHWfl6uw{!s987zQ6eug>!j&%E=yEc>_w;t^+_~*MO6%6&5KE z2D`19vqB2Ptw>cA1pXlk%y56-jGX3uiMdp)cD~OCFc?5+#T3{$l_H$uTX^T_W#oOp zER@cm!s7Gnmr_hBk8gRL$8E%1_jwG6NlD&Q<@-kdY6B;wIR_XsKCqzsL2z+Fzzz-w z8v-9QBfdhGa3c|l_3?LF0;vD}Q>KsdQp|t&WKloWEDo_?pZz{6L@dE6p>(XkzbsG2 zd7k$~ByqNc3W0`nx6^9(nypT=bNam9JL{gDbq~qwMgmEy=NS38l||0|WpnIX`?!eV zWo(S$B9=Ov)$wEn1#N_}xFH$*a2dv@&g8o*a;#;ZAy1bPr*IuqdX7(vaWj>zTqS?| zdU3@Ewp>UbWY~D6hV9a%O#i{9=rZ&DW#R8-qsb4gLRMnY zbi=M{z_T*aGJ2Dc2iMt_u`7z>SkTmbB!>Dd?C=Ahy(MPr^?X~E-}*XkTwmBX*^x}z zgw;}PAGbKF+0y0nxUqC=Xb96O>fy#RBl#3eF}E5b=hw>MP}UxiOVlo8Hz}}6j7p@W zol?knc~d&}duJpQlp`0?4%Db45tkH4b(PQK*qmd7u@ fi~d_4-&^GI_x>N%ZkNWS^4Ey`1%><5L_Ycn9e<)i literal 0 HcmV?d00001 diff --git a/testdata/queue-max-length.json b/testdata/queue-max-length.json new file mode 100644 index 0000000..860cde1 --- /dev/null +++ b/testdata/queue-max-length.json @@ -0,0 +1,156 @@ +[ + { + "arguments": { + "x-max-length-bytes": 99 + }, + "auto_delete": false, + "backing_queue_status": { + "avg_ack_egress_rate": 0, + "avg_ack_ingress_rate": 0, + "avg_egress_rate": 0, + "avg_ingress_rate": 0, + "delta": [ + "delta", + "undefined", + 0, + 0, + "undefined" + ], + "len": 0, + "mode": "default", + "next_seq_id": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "q4": 0, + "target_ram_count": "infinity" + }, + "consumer_utilisation": null, + "consumers": 0, + "durable": true, + "effective_policy_definition": [], + "exclusive": false, + "exclusive_consumer_tag": null, + "garbage_collection": { + "fullsweep_after": 65535, + "max_heap_size": 0, + "min_bin_vheap_size": 46422, + "min_heap_size": 233, + "minor_gcs": 0 + }, + "head_message_timestamp": null, + "idle_since": "2019-02-28 19:57:53", + "memory": 13872, + "message_bytes": 0, + "message_bytes_paged_out": 0, + "message_bytes_persistent": 0, + "message_bytes_ram": 0, + "message_bytes_ready": 0, + "message_bytes_unacknowledged": 0, + "messages": 0, + "messages_details": { + "rate": 0 + }, + "messages_paged_out": 0, + "messages_persistent": 0, + "messages_ram": 0, + "messages_ready": 0, + "messages_ready_details": { + "rate": 0 + }, + "messages_ready_ram": 0, + "messages_unacknowledged": 0, + "messages_unacknowledged_details": { + "rate": 0 + }, + "messages_unacknowledged_ram": 0, + "name": "QueueWithMaxBytes99", + "node": "rabbit@rabbitmq1", + "operator_policy": null, + "policy": null, + "recoverable_slaves": null, + "reductions": 3861, + "reductions_details": { + "rate": 0 + }, + "state": "running", + "vhost": "/" + }, + { + "arguments": { + "x-max-length": 55 + }, + "auto_delete": false, + "backing_queue_status": { + "avg_ack_egress_rate": 0, + "avg_ack_ingress_rate": 0, + "avg_egress_rate": 0, + "avg_ingress_rate": 0, + "delta": [ + "delta", + "undefined", + 0, + 0, + "undefined" + ], + "len": 0, + "mode": "default", + "next_seq_id": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "q4": 0, + "target_ram_count": "infinity" + }, + "consumer_utilisation": null, + "consumers": 0, + "durable": true, + "effective_policy_definition": [], + "exclusive": false, + "exclusive_consumer_tag": null, + "garbage_collection": { + "fullsweep_after": 65535, + "max_heap_size": 0, + "min_bin_vheap_size": 46422, + "min_heap_size": 233, + "minor_gcs": 0 + }, + "head_message_timestamp": null, + "idle_since": "2019-02-28 19:56:48", + "memory": 13864, + "message_bytes": 0, + "message_bytes_paged_out": 0, + "message_bytes_persistent": 0, + "message_bytes_ram": 0, + "message_bytes_ready": 0, + "message_bytes_unacknowledged": 0, + "messages": 0, + "messages_details": { + "rate": 0 + }, + "messages_paged_out": 0, + "messages_persistent": 0, + "messages_ram": 0, + "messages_ready": 0, + "messages_ready_details": { + "rate": 0 + }, + "messages_ready_ram": 0, + "messages_unacknowledged": 0, + "messages_unacknowledged_details": { + "rate": 0 + }, + "messages_unacknowledged_ram": 0, + "name": "QueueWithMaxLength55", + "node": "rabbit@rabbitmq1", + "operator_policy": null, + "policy": null, + "recoverable_slaves": null, + "reductions": 3873, + "reductions_details": { + "rate": 0 + }, + "state": "running", + "vhost": "/" + } +] \ No newline at end of file