From 9ece896d9a78aecf53cf73a826f22d82bfb5e09d Mon Sep 17 00:00:00 2001 From: Joe Adams Date: Fri, 13 May 2022 15:39:19 -0400 Subject: [PATCH] Refactor cluster info collector (#536) * Refactor cluster info collector Adds new cluster info collector. This will eventually replace the cluster info collector in pkg/clusterinfo. The current limitation in fully replacing that package is that it provides downstream collectors with the cluster info to apply the cluster name label. I plan to find a better solution for this when I refactor the next collector. Updates the Collector interactions to more closely match node_exporter and postgres_exporter style of registering collectors. Signed-off-by: Joe Adams * Fix ci Signed-off-by: Joe Adams --- collector/cluster_info.go | 109 ++++++++++++++++++++++++++++++++ collector/collector.go | 127 ++++++++++++++++++++++++++++++++++++-- main.go | 14 +++++ 3 files changed, 245 insertions(+), 5 deletions(-) create mode 100644 collector/cluster_info.go diff --git a/collector/cluster_info.go b/collector/cluster_info.go new file mode 100644 index 00000000..3649a214 --- /dev/null +++ b/collector/cluster_info.go @@ -0,0 +1,109 @@ +// Copyright 2022 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "encoding/json" + "io/ioutil" + "net/http" + "net/url" + + "github.com/blang/semver" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" +) + +func init() { + registerCollector("cluster-info", defaultEnabled, NewClusterInfo) +} + +type ClusterInfoCollector struct { + logger log.Logger + u *url.URL + hc *http.Client +} + +func NewClusterInfo(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { + return &ClusterInfoCollector{ + logger: logger, + u: u, + hc: hc, + }, nil +} + +var clusterInfoDesc = map[string]*prometheus.Desc{ + "version": prometheus.NewDesc( + prometheus.BuildFQName(namespace, "", "version"), + "Elasticsearch version information.", + []string{ + "cluster", + "cluster_uuid", + "build_date", + "build_hash", + "version", + "lucene_version", + }, + nil, + ), +} + +// ClusterInfoResponse is the cluster info retrievable from the / endpoint +type ClusterInfoResponse struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version VersionInfo `json:"version"` + Tagline string `json:"tagline"` +} + +// VersionInfo is the version info retrievable from the / endpoint, embedded in ClusterInfoResponse +type VersionInfo struct { + Number semver.Version `json:"number"` + BuildHash string `json:"build_hash"` + BuildDate string `json:"build_date"` + BuildSnapshot bool `json:"build_snapshot"` + LuceneVersion semver.Version `json:"lucene_version"` +} + +func (c *ClusterInfoCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { + resp, err := c.hc.Get(c.u.String()) + if err != nil { + return err + } + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + var info ClusterInfoResponse + err = json.Unmarshal(b, &info) + if err != nil { + return err + } + + ch <- prometheus.MustNewConstMetric( + clusterInfoDesc["version"], + prometheus.GaugeValue, + 1, + info.ClusterName, + info.ClusterUUID, + info.Version.BuildDate, + info.Version.BuildHash, + info.Version.Number.String(), + info.Version.LuceneVersion.String(), + ) + + return nil +} diff --git a/collector/collector.go b/collector/collector.go index e31141f3..c08a9994 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -16,6 +16,8 @@ package collector import ( "context" + "errors" + "fmt" "net/http" "net/url" "sync" @@ -24,10 +26,26 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" + "gopkg.in/alecthomas/kingpin.v2" ) -// Namespace defines the common namespace to be used by all metrics. -const namespace = "elasticsearch" +const ( + // Namespace defines the common namespace to be used by all metrics. + namespace = "elasticsearch" + + defaultEnabled = true + // defaultDisabled = false +) + +type factoryFunc func(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) + +var ( + factories = make(map[string]factoryFunc) + initiatedCollectorsMtx = sync.Mutex{} + initiatedCollectors = make(map[string]Collector) + collectorState = make(map[string]*bool) + forcedCollectors = map[string]bool{} // collectors which have been explicitly enabled or disabled +) var ( scrapeDurationDesc = prometheus.NewDesc( @@ -50,16 +68,92 @@ type Collector interface { Update(context.Context, chan<- prometheus.Metric) error } +func registerCollector(name string, isDefaultEnabled bool, createFunc factoryFunc) { + var helpDefaultState string + if isDefaultEnabled { + helpDefaultState = "enabled" + } else { + helpDefaultState = "disabled" + } + + // Create flag for this collector + flagName := fmt.Sprintf("collector.%s", name) + flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState) + defaultValue := fmt.Sprintf("%v", isDefaultEnabled) + + flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool() + collectorState[name] = flag + + // Register the create function for this collector + factories[name] = createFunc +} + type ElasticsearchCollector struct { Collectors map[string]Collector logger log.Logger + esURL *url.URL + httpClient *http.Client } +type Option func(*ElasticsearchCollector) error + // NewElasticsearchCollector creates a new ElasticsearchCollector -func NewElasticsearchCollector(logger log.Logger, httpClient *http.Client, esURL *url.URL) (*ElasticsearchCollector, error) { +func NewElasticsearchCollector(logger log.Logger, filters []string, options ...Option) (*ElasticsearchCollector, error) { + e := &ElasticsearchCollector{logger: logger} + // Apply options to customize the collector + for _, o := range options { + if err := o(e); err != nil { + return nil, err + } + } + + f := make(map[string]bool) + for _, filter := range filters { + enabled, exist := collectorState[filter] + if !exist { + return nil, fmt.Errorf("missing collector: %s", filter) + } + if !*enabled { + return nil, fmt.Errorf("disabled collector: %s", filter) + } + f[filter] = true + } collectors := make(map[string]Collector) + initiatedCollectorsMtx.Lock() + defer initiatedCollectorsMtx.Unlock() + for key, enabled := range collectorState { + if !*enabled || (len(f) > 0 && !f[key]) { + continue + } + if collector, ok := initiatedCollectors[key]; ok { + collectors[key] = collector + } else { + collector, err := factories[key](log.With(logger, "collector", key), e.esURL, e.httpClient) + if err != nil { + return nil, err + } + collectors[key] = collector + initiatedCollectors[key] = collector + } + } + + e.Collectors = collectors + + return e, nil +} - return &ElasticsearchCollector{Collectors: collectors, logger: logger}, nil +func WithElasticsearchURL(esURL *url.URL) Option { + return func(e *ElasticsearchCollector) error { + e.esURL = esURL + return nil + } +} + +func WithHTTPClient(hc *http.Client) Option { + return func(e *ElasticsearchCollector) error { + e.httpClient = hc + return nil + } } // Describe implements the prometheus.Collector interface. @@ -89,7 +183,11 @@ func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus var success float64 if err != nil { - _ = level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err) + if IsNoDataError(err) { + _ = level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } else { + _ = level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err) + } success = 0 } else { _ = level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds()) @@ -98,3 +196,22 @@ func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name) ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name) } + +// collectorFlagAction generates a new action function for the given collector +// to track whether it has been explicitly enabled or disabled from the command line. +// A new action function is needed for each collector flag because the ParseContext +// does not contain information about which flag called the action. +// See: https://github.com/alecthomas/kingpin/issues/294 +func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error { + return func(ctx *kingpin.ParseContext) error { + forcedCollectors[collector] = true + return nil + } +} + +// ErrNoData indicates the collector found no data to collect, but had no other error. +var ErrNoData = errors.New("collector returned no data") + +func IsNoDataError(err error) bool { + return err == ErrNoData +} diff --git a/main.go b/main.go index 9aa9fe88..3b121207 100644 --- a/main.go +++ b/main.go @@ -160,6 +160,20 @@ func main() { // version metric prometheus.MustRegister(version.NewCollector(name)) + // create the exporter + exporter, err := collector.NewElasticsearchCollector( + logger, + []string{}, + collector.WithElasticsearchURL(esURL), + collector.WithHTTPClient(httpClient), + ) + if err != nil { + _ = level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err) + os.Exit(1) + } + prometheus.MustRegister(exporter) + + // TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors. // cluster info retriever clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)