diff --git a/README.md b/README.md index 77dae22..5b9e9ac 100644 --- a/README.md +++ b/README.md @@ -136,6 +136,9 @@ metric | description |queue_memory|Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.| |queue_head_message_timestamp|The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.| |queue_status|A metric with a value of constant '1' if the queue is in a certain state. Labels: vhost, queue, *state* (running, flow,..)| +|queue_max_length_bytes|Total body size for ready messages a queue can contain before it starts to drop them from its head.| +|queue_max_length|How many (ready) messages a queue can contain before it starts to drop them from its head.| + #### Queues - Counter diff --git a/bertmap_test.go b/bertmap_test.go index 103faed..5fa7598 100644 --- a/bertmap_test.go +++ b/bertmap_test.go @@ -23,6 +23,10 @@ func TestStatsEquivalence(t *testing.T) { } } +func TestNewFile(t *testing.T) { + assertBertStatsEquivalence(t, "queue-max-length", nodeLabelKeys) +} + func TestMetricMapEquivalence(t *testing.T) { endpoints := []string{"overview"} versions := []string{"3.6.8", "3.7.0"} diff --git a/exporter_queue.go b/exporter_queue.go index 6b94b17..ae907f7 100644 --- a/exporter_queue.go +++ b/exporter_queue.go @@ -33,6 +33,8 @@ var ( "consumer_utilisation": newGaugeVec("queue_consumer_utilisation", "Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.", queueLabels), "memory": newGaugeVec("queue_memory", "Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.", queueLabels), "head_message_timestamp": newGaugeVec("queue_head_message_timestamp", "The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.", queueLabels), //https://github.com/rabbitmq/rabbitmq-server/pull/54 + "arguments.x-max-length-bytes": newGaugeVec("queue_max_length_bytes", "Total body size for ready messages a queue can contain before it starts to drop them from its head.", queueLabels), + "arguments.x-max-length": newGaugeVec("queue_max_length", "How many (ready) messages a queue can contain before it starts to drop them from its head.", queueLabels), } queueCounterVec = map[string]*prometheus.Desc{ diff --git a/exporter_test.go b/exporter_test.go index 85e80bc..042dd27 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "io/ioutil" "net/http" "net/http/httptest" "os" @@ -33,27 +34,33 @@ func dontExpectSubstring(t *testing.T, body string, substr string) { } } -func TestWholeApp(t *testing.T) { +func setupServer(t *testing.T, overview, queues, exchange, nodes, connections string) *httptest.Server { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Header().Set("Content-Type", "application/json") if r.RequestURI == "/api/overview" { - fmt.Fprintln(w, overviewTestData) + fmt.Fprintln(w, overview) } else if r.RequestURI == "/api/queues" { - fmt.Fprintln(w, queuesTestData) + fmt.Fprintln(w, queues) } else if r.RequestURI == "/api/exchanges" { - fmt.Fprintln(w, exchangeAPIResponse) + fmt.Fprintln(w, exchange) } else if r.RequestURI == "/api/nodes" { - fmt.Fprintln(w, nodesAPIResponse) + fmt.Fprintln(w, nodes) } else if r.RequestURI == "/api/connections" { - fmt.Fprintln(w, connectionAPIResponse) + fmt.Fprintln(w, connections) } else { t.Errorf("Invalid request. URI=%v", r.RequestURI) fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI) } })) + return server +} + +func TestWholeApp(t *testing.T) { + server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse) defer server.Close() + os.Setenv("RABBIT_URL", server.URL) defer os.Unsetenv("RABBIT_URL") os.Setenv("SKIP_QUEUES", "^.*3$") @@ -103,26 +110,9 @@ func TestWholeApp(t *testing.T) { } func TestWholeAppInverted(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - if r.RequestURI == "/api/overview" { - fmt.Fprintln(w, overviewTestData) - } else if r.RequestURI == "/api/queues" { - fmt.Fprintln(w, queuesTestData) - } else if r.RequestURI == "/api/exchanges" { - fmt.Fprintln(w, exchangeAPIResponse) - } else if r.RequestURI == "/api/nodes" { - fmt.Fprintln(w, nodesAPIResponse) - } else if r.RequestURI == "/api/connections" { - fmt.Fprintln(w, connectionAPIResponse) - } else { - t.Errorf("Invalid request. URI=%v", r.RequestURI) - fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI) - } - - })) + server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse) defer server.Close() + os.Setenv("RABBIT_URL", server.URL) os.Setenv("SKIP_QUEUES", "^.*3$") defer os.Unsetenv("SKIP_QUEUES") @@ -170,26 +160,9 @@ func TestWholeAppInverted(t *testing.T) { } func TestAppMaxQueues(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - if r.RequestURI == "/api/overview" { - fmt.Fprintln(w, overviewTestData) - } else if r.RequestURI == "/api/queues" { - fmt.Fprintln(w, queuesTestData) - } else if r.RequestURI == "/api/exchanges" { - fmt.Fprintln(w, exchangeAPIResponse) - } else if r.RequestURI == "/api/nodes" { - fmt.Fprintln(w, nodesAPIResponse) - } else if r.RequestURI == "/api/connections" { - fmt.Fprintln(w, connectionAPIResponse) - } else { - t.Errorf("Invalid request. URI=%v", r.RequestURI) - fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI) - } - - })) + server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse) defer server.Close() + os.Setenv("RABBIT_URL", server.URL) os.Setenv("SKIP_QUEUES", "^.*3$") defer os.Unsetenv("SKIP_QUEUES") @@ -443,26 +416,9 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) { } func TestQueueState(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - w.Header().Set("Content-Type", "application/json") - if r.RequestURI == "/api/overview" { - fmt.Fprintln(w, overviewTestData) - } else if r.RequestURI == "/api/queues" { - fmt.Fprintln(w, queuesTestData) - } else if r.RequestURI == "/api/exchanges" { - fmt.Fprintln(w, exchangeAPIResponse) - } else if r.RequestURI == "/api/nodes" { - fmt.Fprintln(w, nodesAPIResponse) - } else if r.RequestURI == "/api/connections" { - fmt.Fprintln(w, connectionAPIResponse) - } else { - t.Errorf("Invalid request. URI=%v", r.RequestURI) - fmt.Fprintf(w, "Invalid request. URI=%v", r.RequestURI) - } - - })) + server := setupServer(t, overviewTestData, queuesTestData, exchangeAPIResponse, nodesAPIResponse, connectionAPIResponse) defer server.Close() + os.Setenv("RABBIT_URL", server.URL) os.Setenv("RABBIT_CAPABILITIES", " ") defer os.Unsetenv("RABBIT_CAPABILITIES") @@ -495,3 +451,38 @@ func TestQueueState(t *testing.T) { expectSubstring(t, body, `rabbitmq_connection_status{node="my-rabbit@ae74c041248b",peer_host="172.31.0.130",self="1",state="running",user="rmq_oms",vhost="/"} 1`) } + +func TestQueueLength(t *testing.T) { + testdataFile := "testdata/queue-max-length.json" + queuedata, err := ioutil.ReadFile(testdataFile) + if err != nil { + t.Fatalf("Error reading %s", testdataFile) + } + server := setupServer(t, "", string(queuedata), "", "", "") + defer server.Close() + + os.Setenv("RABBIT_URL", server.URL) + os.Setenv("RABBIT_CAPABILITIES", " ") + defer os.Unsetenv("RABBIT_CAPABILITIES") + os.Setenv("RABBIT_EXPORTERS", "queue") + defer os.Unsetenv("RABBIT_EXPORTERS") + initConfig() + + exporter := newExporter() + prometheus.MustRegister(exporter) + defer prometheus.Unregister(exporter) + + req, _ := http.NewRequest("GET", "", nil) + w := httptest.NewRecorder() + prometheus.Handler().ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("Home page didn't return %v", http.StatusOK) + } + body := w.Body.String() + t.Log(body) + + // queue + expectSubstring(t, body, `rabbitmq_queue_max_length{durable="true",policy="",queue="QueueWithMaxLength55",self="0",vhost="/"} 55`) + expectSubstring(t, body, `rabbitmq_queue_max_length_bytes{durable="true",policy="",queue="QueueWithMaxBytes99",self="0",vhost="/"} 99`) + +} diff --git a/testdata/queue-max-length.bert b/testdata/queue-max-length.bert new file mode 100644 index 0000000..7fd4fef Binary files /dev/null and b/testdata/queue-max-length.bert differ diff --git a/testdata/queue-max-length.json b/testdata/queue-max-length.json new file mode 100644 index 0000000..860cde1 --- /dev/null +++ b/testdata/queue-max-length.json @@ -0,0 +1,156 @@ +[ + { + "arguments": { + "x-max-length-bytes": 99 + }, + "auto_delete": false, + "backing_queue_status": { + "avg_ack_egress_rate": 0, + "avg_ack_ingress_rate": 0, + "avg_egress_rate": 0, + "avg_ingress_rate": 0, + "delta": [ + "delta", + "undefined", + 0, + 0, + "undefined" + ], + "len": 0, + "mode": "default", + "next_seq_id": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "q4": 0, + "target_ram_count": "infinity" + }, + "consumer_utilisation": null, + "consumers": 0, + "durable": true, + "effective_policy_definition": [], + "exclusive": false, + "exclusive_consumer_tag": null, + "garbage_collection": { + "fullsweep_after": 65535, + "max_heap_size": 0, + "min_bin_vheap_size": 46422, + "min_heap_size": 233, + "minor_gcs": 0 + }, + "head_message_timestamp": null, + "idle_since": "2019-02-28 19:57:53", + "memory": 13872, + "message_bytes": 0, + "message_bytes_paged_out": 0, + "message_bytes_persistent": 0, + "message_bytes_ram": 0, + "message_bytes_ready": 0, + "message_bytes_unacknowledged": 0, + "messages": 0, + "messages_details": { + "rate": 0 + }, + "messages_paged_out": 0, + "messages_persistent": 0, + "messages_ram": 0, + "messages_ready": 0, + "messages_ready_details": { + "rate": 0 + }, + "messages_ready_ram": 0, + "messages_unacknowledged": 0, + "messages_unacknowledged_details": { + "rate": 0 + }, + "messages_unacknowledged_ram": 0, + "name": "QueueWithMaxBytes99", + "node": "rabbit@rabbitmq1", + "operator_policy": null, + "policy": null, + "recoverable_slaves": null, + "reductions": 3861, + "reductions_details": { + "rate": 0 + }, + "state": "running", + "vhost": "/" + }, + { + "arguments": { + "x-max-length": 55 + }, + "auto_delete": false, + "backing_queue_status": { + "avg_ack_egress_rate": 0, + "avg_ack_ingress_rate": 0, + "avg_egress_rate": 0, + "avg_ingress_rate": 0, + "delta": [ + "delta", + "undefined", + 0, + 0, + "undefined" + ], + "len": 0, + "mode": "default", + "next_seq_id": 0, + "q1": 0, + "q2": 0, + "q3": 0, + "q4": 0, + "target_ram_count": "infinity" + }, + "consumer_utilisation": null, + "consumers": 0, + "durable": true, + "effective_policy_definition": [], + "exclusive": false, + "exclusive_consumer_tag": null, + "garbage_collection": { + "fullsweep_after": 65535, + "max_heap_size": 0, + "min_bin_vheap_size": 46422, + "min_heap_size": 233, + "minor_gcs": 0 + }, + "head_message_timestamp": null, + "idle_since": "2019-02-28 19:56:48", + "memory": 13864, + "message_bytes": 0, + "message_bytes_paged_out": 0, + "message_bytes_persistent": 0, + "message_bytes_ram": 0, + "message_bytes_ready": 0, + "message_bytes_unacknowledged": 0, + "messages": 0, + "messages_details": { + "rate": 0 + }, + "messages_paged_out": 0, + "messages_persistent": 0, + "messages_ram": 0, + "messages_ready": 0, + "messages_ready_details": { + "rate": 0 + }, + "messages_ready_ram": 0, + "messages_unacknowledged": 0, + "messages_unacknowledged_details": { + "rate": 0 + }, + "messages_unacknowledged_ram": 0, + "name": "QueueWithMaxLength55", + "node": "rabbit@rabbitmq1", + "operator_policy": null, + "policy": null, + "recoverable_slaves": null, + "reductions": 3873, + "reductions_details": { + "rate": 0 + }, + "state": "running", + "vhost": "/" + } +] \ No newline at end of file