diff --git a/exporter_queue.go b/exporter_queue.go index 399084d..4fc9305 100644 --- a/exporter_queue.go +++ b/exporter_queue.go @@ -34,8 +34,6 @@ var ( "consumer_utilisation": newGaugeVec("queue_consumer_utilisation", "Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.", queueLabels), "memory": newGaugeVec("queue_memory", "Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.", queueLabels), "head_message_timestamp": newGaugeVec("queue_head_message_timestamp", "The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.", queueLabels), //https://github.com/rabbitmq/rabbitmq-server/pull/54 - "arguments.x-max-length-bytes": newGaugeVec("queue_max_length_bytes", "Total body size for ready messages a queue can contain before it starts to drop them from its head.", queueLabels), - "arguments.x-max-length": newGaugeVec("queue_max_length", "How many (ready) messages a queue can contain before it starts to drop them from its head.", queueLabels), "garbage_collection.min_heap_size": newGaugeVec("queue_gc_min_heap", "Minimum heap size in words", queueLabels), "garbage_collection.min_bin_vheap_size": newGaugeVec("queue_gc_min_vheap", "Minimum binary virtual heap size in words", queueLabels), "garbage_collection.fullsweep_after": newGaugeVec("queue_gc_collections_before_fullsweep", "Maximum generational collections before fullsweep", queueLabels), @@ -45,6 +43,10 @@ var ( "message_stats.deliver_no_ack_details.rate": newGaugeVec("queue_messages_deliver_no_ack_rate", "Rate at which messages are delivered to consumers that use automatic acknowledgements.", queueLabels), "message_stats.deliver_details.rate": newGaugeVec("queue_messages_deliver_rate", "Rate at which messages are delivered to consumers that use manual acknowledgements.", queueLabels), } + limitsGaugeVec = map[string]*prometheus.GaugeVec{ + "max-length-bytes": newGaugeVec("queue_max_length_bytes", "Total body size for ready messages a queue can contain before it starts to drop them from its head.", queueLabels), + "max-length": newGaugeVec("queue_max_length", "How many (ready) messages a queue can contain before it starts to drop them from its head.", queueLabels), + } queueCounterVec = map[string]*prometheus.Desc{ "disk_reads": newDesc("queue_disk_reads_total", "Total number of times messages have been read from disk by this queue since it started.", queueLabels), @@ -64,6 +66,7 @@ var ( ) type exporterQueue struct { + limitsGauge map[string]*prometheus.GaugeVec queueMetricsGauge map[string]*prometheus.GaugeVec queueMetricsCounter map[string]*prometheus.Desc stateMetric *prometheus.GaugeVec @@ -73,6 +76,7 @@ type exporterQueue struct { func newExporterQueue() Exporter { queueGaugeVecActual := queueGaugeVec queueCounterVecActual := queueCounterVec + litmitsGaugeVecActual := limitsGaugeVec if len(config.ExcludeMetrics) > 0 { for _, metric := range config.ExcludeMetrics { @@ -82,10 +86,14 @@ func newExporterQueue() Exporter { if queueCounterVecActual[metric] != nil { delete(queueCounterVecActual, metric) } + if litmitsGaugeVecActual[metric] != nil { + delete(litmitsGaugeVecActual, metric) + } } } return exporterQueue{ + limitsGauge: litmitsGaugeVecActual, queueMetricsGauge: queueGaugeVecActual, queueMetricsCounter: queueCounterVecActual, stateMetric: newGaugeVec("queue_state", "A metric with a value of constant '1' if the queue is in a certain state", append(queueLabels, "state")), @@ -93,10 +101,33 @@ func newExporterQueue() Exporter { } } +func collectLowerMetric(metricA, metricB string, stats StatsInfo) float64 { + mA, okA := stats.metrics[metricA] + mB, okB := stats.metrics[metricB] + + if okA && okB { + if mA < mB { + return mA + } else { + return mB + } + } + if okA { + return mA + } + if okB { + return mB + } + return -1.0 +} + func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) error { for _, gaugevec := range e.queueMetricsGauge { gaugevec.Reset() } + for _, m := range e.limitsGauge { + m.Reset() + } e.stateMetric.Reset() e.idleSinceMetric.Reset() @@ -126,40 +157,14 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) } rabbitMqQueueData, err := getStatsInfo(config, "queues", queueLabelKeys) - if err != nil { return err } log.WithField("queueData", rabbitMqQueueData).Debug("Queue data") - for key, gaugevec := range e.queueMetricsGauge { - for _, queue := range rabbitMqQueueData { - qname := queue.labels["name"] - vname := queue.labels["vhost"] - if value, ok := queue.metrics[key]; ok { - - if matchVhost := config.IncludeVHost.MatchString(vname); matchVhost { - if skipVhost := config.SkipVHost.MatchString(vname); !skipVhost { - if matchInclude := config.IncludeQueues.MatchString(qname); matchInclude { - if matchSkip := config.SkipQueues.MatchString(qname); !matchSkip { - self := "0" - if queue.labels["node"] == selfNode { - self = "1" - } - // log.WithFields(log.Fields{"vhost": queue.labels["vhost"], "queue": queue.labels["name"], "key": key, "value": value}).Info("Set queue metric for key") - gaugevec.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self).Set(value) - } - } - } - } - } - } - } - for _, queue := range rabbitMqQueueData { qname := queue.labels["name"] vname := queue.labels["vhost"] - if vhostIncluded := config.IncludeVHost.MatchString(vname); !vhostIncluded { continue } @@ -177,50 +182,51 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) if queue.labels["node"] == selfNode { self = "1" } + labelValues := []string{cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self} + + for key, gaugevec := range e.queueMetricsGauge { + if value, ok := queue.metrics[key]; ok { + // log.WithFields(log.Fields{"vhost": queue.labels["vhost"], "queue": queue.labels["name"], "key": key, "value": value}).Info("Set queue metric for key") + gaugevec.WithLabelValues(labelValues...).Set(value) + } + } + + for key, countvec := range e.queueMetricsCounter { + if value, ok := queue.metrics[key]; ok { + ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, labelValues...) + } else { + ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, labelValues...) + } + } + state := queue.labels["state"] idleSince, exists := queue.labels["idle_since"] if exists && idleSince != "" { if t, err := time.Parse("2006-01-02 15:04:05", idleSince); err == nil { unixSeconds := float64(t.UnixNano()) / 1e9 - state := queue.labels["state"] + if state == "running" { //replace running state with idle if idle_since time is provided. Other states (flow, etc.) are not replaced state = "idle" } - e.idleSinceMetric.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self).Set(unixSeconds) - e.stateMetric.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self, state).Set(1) + e.idleSinceMetric.WithLabelValues(labelValues...).Set(unixSeconds) } else { log.WithError(err).WithField("idle_since", idleSince).Warn("error parsing idle since time") } - } else { - e.stateMetric.WithLabelValues(cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self, queue.labels["state"]).Set(1) } - } + e.stateMetric.WithLabelValues(append(labelValues, state)...).Set(1) - for key, countvec := range e.queueMetricsCounter { - for _, queue := range rabbitMqQueueData { - qname := queue.labels["name"] - vname := queue.labels["vhost"] - - if matchVhost := config.IncludeVHost.MatchString(vname); matchVhost { - if skipVhost := config.SkipVHost.MatchString(vname); !skipVhost { - if matchInclude := config.IncludeQueues.MatchString(qname); matchInclude { - if matchSkip := config.SkipQueues.MatchString(qname); !matchSkip { - self := "0" - if queue.labels["node"] == selfNode { - self = "1" - } - if value, ok := queue.metrics[key]; ok { - ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, value, cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self) - } else { - ch <- prometheus.MustNewConstMetric(countvec, prometheus.CounterValue, 0, cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self) - } - } - } - } - } + if f := collectLowerMetric("arguments.x-max-length", "effective_policy_definition.max-length", queue); f >= 0 { + limitsGaugeVec["max-length"].WithLabelValues(labelValues...).Set(f) } + if f := collectLowerMetric("arguments.x-max-length-bytes", "effective_policy_definition.max-length-bytes", queue); f >= 0 { + limitsGaugeVec["max-length-bytes"].WithLabelValues(labelValues...).Set(f) + } + } + for _, metric := range e.limitsGauge { + metric.Collect(ch) + } for _, gaugevec := range e.queueMetricsGauge { gaugevec.Collect(ch) } @@ -231,6 +237,9 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) } func (e exporterQueue) Describe(ch chan<- *prometheus.Desc) { + for _, metric := range e.limitsGauge { + metric.Describe(ch) + } for _, gaugevec := range e.queueMetricsGauge { gaugevec.Describe(ch) } diff --git a/exporter_test.go b/exporter_test.go index d26f369..0a85bb1 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -87,6 +87,10 @@ func TestWholeApp(t *testing.T) { } body := w.Body.String() t.Log(body) + lines := strings.Split(body, "\n") + if lc := len(lines); lc != 372 { + t.Errorf("expected 372 lines, got %d", lc) + } expectSubstring(t, body, `rabbitmq_up{cluster="my-rabbit@ae74c041248b",node="my-rabbit@ae74c041248b"} 1`) // overview diff --git a/testenv/testenv.go b/testenv/testenv.go index 0810dca..84126d1 100644 --- a/testenv/testenv.go +++ b/testenv/testenv.go @@ -137,7 +137,7 @@ func (tenv *TestEnvironment) MustSetPolicy(name string, pattern string) { client := &http.Client{} request, err := http.NewRequest("PUT", url, strings.NewReader(policy)) if err != nil { - log.Fatal(fmt.Errorf("could not create NewRequst: %w", err)) + log.Fatal(fmt.Errorf("could not create NewRequest: %w", err)) } request.Header.Add("Content-Type", "application/json") request.ContentLength = int64(len(policy))