Skip to content

Commit

Permalink
Refactor cluster info collector (prometheus-community#536)
Browse files Browse the repository at this point in the history
* Refactor cluster info collector

Adds new cluster info collector. This will eventually replace the cluster info collector in pkg/clusterinfo. The current limitation in fully replacing that package is that it provides downstream collectors with the cluster info to apply the cluster name label. I plan to find a better solution for this when I refactor the next collector.

Updates the Collector interactions to more closely match node_exporter and postgres_exporter style of registering collectors.

Signed-off-by: Joe Adams <[email protected]>

* Fix ci

Signed-off-by: Joe Adams <[email protected]>
  • Loading branch information
sysadmind authored May 13, 2022
1 parent 2b48011 commit 9ece896
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 5 deletions.
109 changes: 109 additions & 0 deletions collector/cluster_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"

"github.com/blang/semver"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
registerCollector("cluster-info", defaultEnabled, NewClusterInfo)
}

type ClusterInfoCollector struct {
logger log.Logger
u *url.URL
hc *http.Client
}

func NewClusterInfo(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
return &ClusterInfoCollector{
logger: logger,
u: u,
hc: hc,
}, nil
}

var clusterInfoDesc = map[string]*prometheus.Desc{
"version": prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "version"),
"Elasticsearch version information.",
[]string{
"cluster",
"cluster_uuid",
"build_date",
"build_hash",
"version",
"lucene_version",
},
nil,
),
}

// ClusterInfoResponse is the cluster info retrievable from the / endpoint
type ClusterInfoResponse struct {
Name string `json:"name"`
ClusterName string `json:"cluster_name"`
ClusterUUID string `json:"cluster_uuid"`
Version VersionInfo `json:"version"`
Tagline string `json:"tagline"`
}

// VersionInfo is the version info retrievable from the / endpoint, embedded in ClusterInfoResponse
type VersionInfo struct {
Number semver.Version `json:"number"`
BuildHash string `json:"build_hash"`
BuildDate string `json:"build_date"`
BuildSnapshot bool `json:"build_snapshot"`
LuceneVersion semver.Version `json:"lucene_version"`
}

func (c *ClusterInfoCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
resp, err := c.hc.Get(c.u.String())
if err != nil {
return err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var info ClusterInfoResponse
err = json.Unmarshal(b, &info)
if err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
clusterInfoDesc["version"],
prometheus.GaugeValue,
1,
info.ClusterName,
info.ClusterUUID,
info.Version.BuildDate,
info.Version.BuildHash,
info.Version.Number.String(),
info.Version.LuceneVersion.String(),
)

return nil
}
127 changes: 122 additions & 5 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package collector

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"sync"
Expand All @@ -24,10 +26,26 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/alecthomas/kingpin.v2"
)

// Namespace defines the common namespace to be used by all metrics.
const namespace = "elasticsearch"
const (
// Namespace defines the common namespace to be used by all metrics.
namespace = "elasticsearch"

defaultEnabled = true
// defaultDisabled = false
)

type factoryFunc func(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error)

var (
factories = make(map[string]factoryFunc)
initiatedCollectorsMtx = sync.Mutex{}
initiatedCollectors = make(map[string]Collector)
collectorState = make(map[string]*bool)
forcedCollectors = map[string]bool{} // collectors which have been explicitly enabled or disabled
)

var (
scrapeDurationDesc = prometheus.NewDesc(
Expand All @@ -50,16 +68,92 @@ type Collector interface {
Update(context.Context, chan<- prometheus.Metric) error
}

func registerCollector(name string, isDefaultEnabled bool, createFunc factoryFunc) {
var helpDefaultState string
if isDefaultEnabled {
helpDefaultState = "enabled"
} else {
helpDefaultState = "disabled"
}

// Create flag for this collector
flagName := fmt.Sprintf("collector.%s", name)
flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState)
defaultValue := fmt.Sprintf("%v", isDefaultEnabled)

flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool()
collectorState[name] = flag

// Register the create function for this collector
factories[name] = createFunc
}

type ElasticsearchCollector struct {
Collectors map[string]Collector
logger log.Logger
esURL *url.URL
httpClient *http.Client
}

type Option func(*ElasticsearchCollector) error

// NewElasticsearchCollector creates a new ElasticsearchCollector
func NewElasticsearchCollector(logger log.Logger, httpClient *http.Client, esURL *url.URL) (*ElasticsearchCollector, error) {
func NewElasticsearchCollector(logger log.Logger, filters []string, options ...Option) (*ElasticsearchCollector, error) {
e := &ElasticsearchCollector{logger: logger}
// Apply options to customize the collector
for _, o := range options {
if err := o(e); err != nil {
return nil, err
}
}

f := make(map[string]bool)
for _, filter := range filters {
enabled, exist := collectorState[filter]
if !exist {
return nil, fmt.Errorf("missing collector: %s", filter)
}
if !*enabled {
return nil, fmt.Errorf("disabled collector: %s", filter)
}
f[filter] = true
}
collectors := make(map[string]Collector)
initiatedCollectorsMtx.Lock()
defer initiatedCollectorsMtx.Unlock()
for key, enabled := range collectorState {
if !*enabled || (len(f) > 0 && !f[key]) {
continue
}
if collector, ok := initiatedCollectors[key]; ok {
collectors[key] = collector
} else {
collector, err := factories[key](log.With(logger, "collector", key), e.esURL, e.httpClient)
if err != nil {
return nil, err
}
collectors[key] = collector
initiatedCollectors[key] = collector
}
}

e.Collectors = collectors

return e, nil
}

return &ElasticsearchCollector{Collectors: collectors, logger: logger}, nil
func WithElasticsearchURL(esURL *url.URL) Option {
return func(e *ElasticsearchCollector) error {
e.esURL = esURL
return nil
}
}

func WithHTTPClient(hc *http.Client) Option {
return func(e *ElasticsearchCollector) error {
e.httpClient = hc
return nil
}
}

// Describe implements the prometheus.Collector interface.
Expand Down Expand Up @@ -89,7 +183,11 @@ func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus
var success float64

if err != nil {
_ = level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
if IsNoDataError(err) {
_ = level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err)
} else {
_ = level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
}
success = 0
} else {
_ = level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds())
Expand All @@ -98,3 +196,22 @@ func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name)
}

// collectorFlagAction generates a new action function for the given collector
// to track whether it has been explicitly enabled or disabled from the command line.
// A new action function is needed for each collector flag because the ParseContext
// does not contain information about which flag called the action.
// See: https://github.com/alecthomas/kingpin/issues/294
func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error {
return func(ctx *kingpin.ParseContext) error {
forcedCollectors[collector] = true
return nil
}
}

// ErrNoData indicates the collector found no data to collect, but had no other error.
var ErrNoData = errors.New("collector returned no data")

func IsNoDataError(err error) bool {
return err == ErrNoData
}
14 changes: 14 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ func main() {
// version metric
prometheus.MustRegister(version.NewCollector(name))

// create the exporter
exporter, err := collector.NewElasticsearchCollector(
logger,
[]string{},
collector.WithElasticsearchURL(esURL),
collector.WithHTTPClient(httpClient),
)
if err != nil {
_ = level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err)
os.Exit(1)
}
prometheus.MustRegister(exporter)

// TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors.
// cluster info retriever
clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)

Expand Down

0 comments on commit 9ece896

Please sign in to comment.