Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor cluster info collector #536

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions collector/cluster_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/url"

"github.com/blang/semver"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

func init() {
registerCollector("cluster-info", defaultEnabled, NewClusterInfo)
}

type ClusterInfoCollector struct {
logger log.Logger
u *url.URL
hc *http.Client
}

func NewClusterInfo(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
return &ClusterInfoCollector{
logger: logger,
u: u,
hc: hc,
}, nil
}

var clusterInfoDesc = map[string]*prometheus.Desc{
"version": prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "version"),
"Elasticsearch version information.",
[]string{
"cluster",
"cluster_uuid",
"build_date",
"build_hash",
"version",
"lucene_version",
},
nil,
),
}

// ClusterInfoResponse is the cluster info retrievable from the / endpoint
type ClusterInfoResponse struct {
Name string `json:"name"`
ClusterName string `json:"cluster_name"`
ClusterUUID string `json:"cluster_uuid"`
Version VersionInfo `json:"version"`
Tagline string `json:"tagline"`
}

// VersionInfo is the version info retrievable from the / endpoint, embedded in ClusterInfoResponse
type VersionInfo struct {
Number semver.Version `json:"number"`
BuildHash string `json:"build_hash"`
BuildDate string `json:"build_date"`
BuildSnapshot bool `json:"build_snapshot"`
LuceneVersion semver.Version `json:"lucene_version"`
}

func (c *ClusterInfoCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
resp, err := c.hc.Get(c.u.String())
if err != nil {
return err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
var info ClusterInfoResponse
err = json.Unmarshal(b, &info)
if err != nil {
return err
}

ch <- prometheus.MustNewConstMetric(
clusterInfoDesc["version"],
prometheus.GaugeValue,
1,
info.ClusterName,
info.ClusterUUID,
info.Version.BuildDate,
info.Version.BuildHash,
info.Version.Number.String(),
info.Version.LuceneVersion.String(),
)

return nil
}
127 changes: 122 additions & 5 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package collector

import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"sync"
Expand All @@ -24,10 +26,26 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/alecthomas/kingpin.v2"
)

// Namespace defines the common namespace to be used by all metrics.
const namespace = "elasticsearch"
const (
// Namespace defines the common namespace to be used by all metrics.
namespace = "elasticsearch"

defaultEnabled = true
// defaultDisabled = false
)

type factoryFunc func(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error)

var (
factories = make(map[string]factoryFunc)
initiatedCollectorsMtx = sync.Mutex{}
initiatedCollectors = make(map[string]Collector)
collectorState = make(map[string]*bool)
forcedCollectors = map[string]bool{} // collectors which have been explicitly enabled or disabled
)

var (
scrapeDurationDesc = prometheus.NewDesc(
Expand All @@ -50,16 +68,92 @@ type Collector interface {
Update(context.Context, chan<- prometheus.Metric) error
}

func registerCollector(name string, isDefaultEnabled bool, createFunc factoryFunc) {
var helpDefaultState string
if isDefaultEnabled {
helpDefaultState = "enabled"
} else {
helpDefaultState = "disabled"
}

// Create flag for this collector
flagName := fmt.Sprintf("collector.%s", name)
flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState)
defaultValue := fmt.Sprintf("%v", isDefaultEnabled)

flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool()
collectorState[name] = flag

// Register the create function for this collector
factories[name] = createFunc
}

type ElasticsearchCollector struct {
Collectors map[string]Collector
logger log.Logger
esURL *url.URL
httpClient *http.Client
}

type Option func(*ElasticsearchCollector) error

// NewElasticsearchCollector creates a new ElasticsearchCollector
func NewElasticsearchCollector(logger log.Logger, httpClient *http.Client, esURL *url.URL) (*ElasticsearchCollector, error) {
func NewElasticsearchCollector(logger log.Logger, filters []string, options ...Option) (*ElasticsearchCollector, error) {
e := &ElasticsearchCollector{logger: logger}
// Apply options to customize the collector
for _, o := range options {
if err := o(e); err != nil {
return nil, err
}
}

f := make(map[string]bool)
for _, filter := range filters {
enabled, exist := collectorState[filter]
if !exist {
return nil, fmt.Errorf("missing collector: %s", filter)
}
if !*enabled {
return nil, fmt.Errorf("disabled collector: %s", filter)
}
f[filter] = true
}
collectors := make(map[string]Collector)
initiatedCollectorsMtx.Lock()
defer initiatedCollectorsMtx.Unlock()
for key, enabled := range collectorState {
if !*enabled || (len(f) > 0 && !f[key]) {
continue
}
if collector, ok := initiatedCollectors[key]; ok {
collectors[key] = collector
} else {
collector, err := factories[key](log.With(logger, "collector", key), e.esURL, e.httpClient)
if err != nil {
return nil, err
}
collectors[key] = collector
initiatedCollectors[key] = collector
}
}

e.Collectors = collectors

return e, nil
}

return &ElasticsearchCollector{Collectors: collectors, logger: logger}, nil
func WithElasticsearchURL(esURL *url.URL) Option {
return func(e *ElasticsearchCollector) error {
e.esURL = esURL
return nil
}
}

func WithHTTPClient(hc *http.Client) Option {
return func(e *ElasticsearchCollector) error {
e.httpClient = hc
return nil
}
}

// Describe implements the prometheus.Collector interface.
Expand Down Expand Up @@ -89,7 +183,11 @@ func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus
var success float64

if err != nil {
_ = level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
if IsNoDataError(err) {
_ = level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err)
} else {
_ = level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
}
success = 0
} else {
_ = level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds())
Expand All @@ -98,3 +196,22 @@ func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name)
}

// collectorFlagAction generates a new action function for the given collector
// to track whether it has been explicitly enabled or disabled from the command line.
// A new action function is needed for each collector flag because the ParseContext
// does not contain information about which flag called the action.
// See: https://github.com/alecthomas/kingpin/issues/294
func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error {
return func(ctx *kingpin.ParseContext) error {
forcedCollectors[collector] = true
return nil
}
}

// ErrNoData indicates the collector found no data to collect, but had no other error.
var ErrNoData = errors.New("collector returned no data")

func IsNoDataError(err error) bool {
return err == ErrNoData
}
14 changes: 14 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,20 @@ func main() {
// version metric
prometheus.MustRegister(version.NewCollector(name))

// create the exporter
exporter, err := collector.NewElasticsearchCollector(
logger,
[]string{},
collector.WithElasticsearchURL(esURL),
collector.WithHTTPClient(httpClient),
)
if err != nil {
_ = level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err)
os.Exit(1)
}
prometheus.MustRegister(exporter)

// TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors.
// cluster info retriever
clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)

Expand Down