From d4a526c34c0011c09ea29a9e3c2f305d7bfb40a9 Mon Sep 17 00:00:00 2001 From: Louis-Philippe Huberdeau Date: Tue, 19 Jul 2022 11:46:04 -0400 Subject: [PATCH] Add option to make the self label uniform regardless of which node is hit by the load balancer --- README.md | 1 + config.go | 18 ++++++++++++++++++ config_test.go | 36 ++++++++++++++++++++++++++++++++++++ exporter_connections.go | 10 ++-------- exporter_federation.go | 5 +---- exporter_memory.go | 5 +---- exporter_node.go | 5 +---- exporter_queue.go | 5 +---- exporter_shovel.go | 5 +---- exporter_test.go | 17 +++++++++++++++++ main.go | 1 + 11 files changed, 80 insertions(+), 28 deletions(-) diff --git a/README.md b/README.md index 9bf555c..201bcb7 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ Environment variable|default|description RABBIT_URL | | url to rabbitMQ management plugin (must start with http(s)://) RABBIT_USER | guest | username for rabbitMQ management plugin. User needs monitoring tag! RABBIT_PASSWORD | guest | password for rabbitMQ management plugin +RABBIT_CONNECTION | direct | direct or loadbalancer, strips the self label when loadbalancer RABBIT_USER_FILE| | location of file with username (useful for docker secrets) RABBIT_PASSWORD_FILE | | location of file with password (useful for docker secrets) PUBLISH_PORT | 9419 | Listening port for the exporter diff --git a/config.go b/config.go index ff2c165..04a5b98 100644 --- a/config.go +++ b/config.go @@ -17,6 +17,7 @@ var ( RabbitURL: "http://127.0.0.1:15672", RabbitUsername: "guest", RabbitPassword: "guest", + RabbitConnection: "direct", PublishPort: "9419", PublishAddr: "", OutputFormat: "TTY", //JSON @@ -43,6 +44,7 @@ type rabbitExporterConfig struct { RabbitURL string `json:"rabbit_url"` RabbitUsername string `json:"rabbit_user"` RabbitPassword string `json:"rabbit_pass"` + RabbitConnection string `json:"rabbit_connection"` PublishPort string `json:"publish_port"` PublishAddr string `json:"publish_addr"` OutputFormat string `json:"output_format"` @@ -117,6 +119,14 @@ func initConfig() { } } + if connection := os.Getenv("RABBIT_CONNECTION"); connection != "" { + if valid, _ := regexp.MatchString("(direct|loadbalancer)", connection); valid { + config.RabbitConnection = connection + } else { + panic(fmt.Errorf("rabbit connection must be direct or loadbalancer")) + } + } + var user string var pass string @@ -251,3 +261,11 @@ func isCapEnabled(config rabbitExporterConfig, cap rabbitCapability) bool { exists, enabled := config.RabbitCapabilities[cap] return exists && enabled } + +func selfLabel(config rabbitExporterConfig, isSelf bool) string { + if config.RabbitConnection == "loadbalancer" || isSelf { + return "1" + } else { + return "0" + } +} diff --git a/config_test.go b/config_test.go index 3b1578c..12fd575 100644 --- a/config_test.go +++ b/config_test.go @@ -179,3 +179,39 @@ func TestConfig_EnabledExporters(t *testing.T) { t.Errorf("Invalid Exporters list. diff\n%v", diff) } } + +func TestConfig_RabbitConnection_Default(t *testing.T) { + defer os.Unsetenv("RABBIT_CONNECTION") + + os.Unsetenv("RABBIT_CONNECTION") + initConfig() + + if config.RabbitConnection != "direct" { + t.Errorf("RabbitConnection unspecified. It should default to direct. expected=%v,got=%v", "direct", config.RabbitConnection) + } +} + +func TestConfig_RabbitConnection_LoadBalaner(t *testing.T) { + newValue := "loadbalancer" + defer os.Unsetenv("RABBIT_CONNECTION") + + os.Setenv("RABBIT_CONNECTION", newValue) + initConfig() + + if config.RabbitConnection != newValue { + t.Errorf("RabbitConnection specified. It should be modified. expected=%v,got=%v", newValue, config.RabbitConnection) + } +} + +func TestConfig_RabbitConnection_Invalid(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("initConfig should panic on invalid rabbit connection config") + } + }() + newValue := "invalid" + defer os.Unsetenv("RABBIT_CONNECTION") + + os.Setenv("RABBIT_CONNECTION", newValue) + initConfig() +} diff --git a/exporter_connections.go b/exporter_connections.go index b6e9643..d6e8352 100644 --- a/exporter_connections.go +++ b/exporter_connections.go @@ -70,20 +70,14 @@ func (e exporterConnections) Collect(ctx context.Context, ch chan<- prometheus.M for key, gauge := range e.connectionMetricsG { for _, connD := range rabbitConnectionResponses { if value, ok := connD.metrics[key]; ok { - self := "0" - if connD.labels["node"] == selfNode { - self = "1" - } + self := selfLabel(config, connD.labels["node"] == selfNode) gauge.WithLabelValues(cluster, connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], self).Add(value) } } } for _, connD := range rabbitConnectionResponses { - self := "0" - if connD.labels["node"] == selfNode { - self = "1" - } + self := selfLabel(config, connD.labels["node"] == selfNode) e.stateMetric.WithLabelValues(cluster, connD.labels["vhost"], connD.labels["node"], connD.labels["peer_host"], connD.labels["user"], connD.labels["state"], self).Add(1) } diff --git a/exporter_federation.go b/exporter_federation.go index e6d96c3..f0fa1c9 100644 --- a/exporter_federation.go +++ b/exporter_federation.go @@ -43,10 +43,7 @@ func (e exporterFederation) Collect(ctx context.Context, ch chan<- prometheus.Me } for _, federation := range federationData { - self := "0" - if federation.labels["node"] == selfNode { - self = "1" - } + self := selfLabel(config, federation.labels["node"] == selfNode) e.stateMetric.WithLabelValues(cluster, federation.labels["vhost"], federation.labels["node"], federation.labels["queue"], federation.labels["exchange"], self, federation.labels["status"]).Set(1) } diff --git a/exporter_memory.go b/exporter_memory.go index 1fbdbd7..a65daad 100644 --- a/exporter_memory.go +++ b/exporter_memory.go @@ -167,10 +167,7 @@ func (e exporterMemory) Collect(ctx context.Context, ch chan<- prometheus.Metric } for _, node := range nodeData { - self := "0" - if node.labels["name"] == selfNode { - self = "1" - } + self := selfLabel(config, node.labels["name"] == selfNode) rabbitMemoryResponses, err := getMetricMap(config, fmt.Sprintf("nodes/%s/memory", node.labels["name"])) if err != nil { return err diff --git a/exporter_node.go b/exporter_node.go index f64e4fe..74ecb29 100644 --- a/exporter_node.go +++ b/exporter_node.go @@ -74,10 +74,7 @@ func (e exporterNode) Collect(ctx context.Context, ch chan<- prometheus.Metric) for key, gauge := range e.nodeMetricsGauge { for _, node := range nodeData { if value, ok := node.metrics[key]; ok { - self := "0" - if node.labels["name"] == selfNode { - self = "1" - } + self := selfLabel(config, node.labels["name"] == selfNode) gauge.WithLabelValues(cluster, node.labels["name"], self).Set(value) } } diff --git a/exporter_queue.go b/exporter_queue.go index c62b319..c45e4d2 100644 --- a/exporter_queue.go +++ b/exporter_queue.go @@ -177,10 +177,7 @@ func (e exporterQueue) Collect(ctx context.Context, ch chan<- prometheus.Metric) continue } - self := "0" - if queue.labels["node"] == selfNode { - self = "1" - } + self := selfLabel(config, queue.labels["node"] == selfNode) labelValues := []string{cluster, queue.labels["vhost"], queue.labels["name"], queue.labels["durable"], queue.labels["policy"], self} for key, gaugevec := range e.queueMetricsGauge { diff --git a/exporter_shovel.go b/exporter_shovel.go index 72f08f8..9b496ae 100644 --- a/exporter_shovel.go +++ b/exporter_shovel.go @@ -45,10 +45,7 @@ func (e exporterShovel) Collect(ctx context.Context, ch chan<- prometheus.Metric } for _, shovel := range shovelData { - self := "0" - if shovel.labels["node"] == selfNode { - self = "1" - } + self := selfLabel(config, shovel.labels["node"] == selfNode) e.stateMetric.WithLabelValues(cluster, shovel.labels["vhost"], shovel.labels["name"], shovel.labels["type"], self, shovel.labels["state"]).Set(1) } diff --git a/exporter_test.go b/exporter_test.go index 96db97a..86e960c 100644 --- a/exporter_test.go +++ b/exporter_test.go @@ -424,6 +424,23 @@ func TestResetMetricsOnRabbitFailure(t *testing.T) { dontExpectSubstring(t, body, `rabbitmq_connection_received_packets{cluster="my-rabbit@ae74c041248b",node="rabbit@rmq-cluster-node-04",peer_host="172.31.0.130",self="1",user="rmq_oms",vhost="/"}`) }) + t.Run("RabbitMQ is using loadbalancer -> self is always 1", func(t *testing.T) { + rabbitUP = true + rabbitQueuesUp = true + config.RabbitConnection = "loadbalaner" + req, _ := http.NewRequest("GET", "", nil) + w := httptest.NewRecorder() + promhttp.Handler().ServeHTTP(w, req) + if w.Code != http.StatusOK { + t.Errorf("Home page didn't return %v", http.StatusOK) + } + body := w.Body.String() + t.Log(body) + + // queue + expectSubstring(t, body, `rabbitmq_queue_messages_ready{cluster="my-rabbit@ae74c041248b",durable="true",policy="ha-2",queue="myQueue2",self="1",vhost="/"} 25`) + }) + } func TestExporter(t *testing.T) { diff --git a/main.go b/main.go index e240a85..b2314ff 100644 --- a/main.go +++ b/main.go @@ -64,6 +64,7 @@ func main() { "PUBLISH_PORT": config.PublishPort, "RABBIT_URL": config.RabbitURL, "RABBIT_USER": config.RabbitUsername, + "RABBIT_CONNECTION": config.RabbitConnection, "OUTPUT_FORMAT": config.OutputFormat, "RABBIT_CAPABILITIES": formatCapabilities(config.RabbitCapabilities), "RABBIT_EXPORTERS": config.EnabledExporters,