Skip to content

Commit 9ece896

Browse files
authored
Refactor cluster info collector (#536)
* Refactor cluster info collector Adds new cluster info collector. This will eventually replace the cluster info collector in pkg/clusterinfo. The current limitation in fully replacing that package is that it provides downstream collectors with the cluster info to apply the cluster name label. I plan to find a better solution for this when I refactor the next collector. Updates the Collector interactions to more closely match node_exporter and postgres_exporter style of registering collectors. Signed-off-by: Joe Adams <[email protected]> * Fix ci Signed-off-by: Joe Adams <[email protected]>
1 parent 2b48011 commit 9ece896

File tree

3 files changed

+245
-5
lines changed

3 files changed

+245
-5
lines changed

collector/cluster_info.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright 2022 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package collector
15+
16+
import (
17+
"context"
18+
"encoding/json"
19+
"io/ioutil"
20+
"net/http"
21+
"net/url"
22+
23+
"github.com/blang/semver"
24+
"github.com/go-kit/log"
25+
"github.com/prometheus/client_golang/prometheus"
26+
)
27+
28+
func init() {
29+
registerCollector("cluster-info", defaultEnabled, NewClusterInfo)
30+
}
31+
32+
type ClusterInfoCollector struct {
33+
logger log.Logger
34+
u *url.URL
35+
hc *http.Client
36+
}
37+
38+
func NewClusterInfo(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
39+
return &ClusterInfoCollector{
40+
logger: logger,
41+
u: u,
42+
hc: hc,
43+
}, nil
44+
}
45+
46+
var clusterInfoDesc = map[string]*prometheus.Desc{
47+
"version": prometheus.NewDesc(
48+
prometheus.BuildFQName(namespace, "", "version"),
49+
"Elasticsearch version information.",
50+
[]string{
51+
"cluster",
52+
"cluster_uuid",
53+
"build_date",
54+
"build_hash",
55+
"version",
56+
"lucene_version",
57+
},
58+
nil,
59+
),
60+
}
61+
62+
// ClusterInfoResponse is the cluster info retrievable from the / endpoint
63+
type ClusterInfoResponse struct {
64+
Name string `json:"name"`
65+
ClusterName string `json:"cluster_name"`
66+
ClusterUUID string `json:"cluster_uuid"`
67+
Version VersionInfo `json:"version"`
68+
Tagline string `json:"tagline"`
69+
}
70+
71+
// VersionInfo is the version info retrievable from the / endpoint, embedded in ClusterInfoResponse
72+
type VersionInfo struct {
73+
Number semver.Version `json:"number"`
74+
BuildHash string `json:"build_hash"`
75+
BuildDate string `json:"build_date"`
76+
BuildSnapshot bool `json:"build_snapshot"`
77+
LuceneVersion semver.Version `json:"lucene_version"`
78+
}
79+
80+
func (c *ClusterInfoCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
81+
resp, err := c.hc.Get(c.u.String())
82+
if err != nil {
83+
return err
84+
}
85+
defer resp.Body.Close()
86+
b, err := ioutil.ReadAll(resp.Body)
87+
if err != nil {
88+
return err
89+
}
90+
var info ClusterInfoResponse
91+
err = json.Unmarshal(b, &info)
92+
if err != nil {
93+
return err
94+
}
95+
96+
ch <- prometheus.MustNewConstMetric(
97+
clusterInfoDesc["version"],
98+
prometheus.GaugeValue,
99+
1,
100+
info.ClusterName,
101+
info.ClusterUUID,
102+
info.Version.BuildDate,
103+
info.Version.BuildHash,
104+
info.Version.Number.String(),
105+
info.Version.LuceneVersion.String(),
106+
)
107+
108+
return nil
109+
}

collector/collector.go

+122-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package collector
1616

1717
import (
1818
"context"
19+
"errors"
20+
"fmt"
1921
"net/http"
2022
"net/url"
2123
"sync"
@@ -24,10 +26,26 @@ import (
2426
"github.com/go-kit/log"
2527
"github.com/go-kit/log/level"
2628
"github.com/prometheus/client_golang/prometheus"
29+
"gopkg.in/alecthomas/kingpin.v2"
2730
)
2831

29-
// Namespace defines the common namespace to be used by all metrics.
30-
const namespace = "elasticsearch"
32+
const (
33+
// Namespace defines the common namespace to be used by all metrics.
34+
namespace = "elasticsearch"
35+
36+
defaultEnabled = true
37+
// defaultDisabled = false
38+
)
39+
40+
type factoryFunc func(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error)
41+
42+
var (
43+
factories = make(map[string]factoryFunc)
44+
initiatedCollectorsMtx = sync.Mutex{}
45+
initiatedCollectors = make(map[string]Collector)
46+
collectorState = make(map[string]*bool)
47+
forcedCollectors = map[string]bool{} // collectors which have been explicitly enabled or disabled
48+
)
3149

3250
var (
3351
scrapeDurationDesc = prometheus.NewDesc(
@@ -50,16 +68,92 @@ type Collector interface {
5068
Update(context.Context, chan<- prometheus.Metric) error
5169
}
5270

71+
func registerCollector(name string, isDefaultEnabled bool, createFunc factoryFunc) {
72+
var helpDefaultState string
73+
if isDefaultEnabled {
74+
helpDefaultState = "enabled"
75+
} else {
76+
helpDefaultState = "disabled"
77+
}
78+
79+
// Create flag for this collector
80+
flagName := fmt.Sprintf("collector.%s", name)
81+
flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState)
82+
defaultValue := fmt.Sprintf("%v", isDefaultEnabled)
83+
84+
flag := kingpin.Flag(flagName, flagHelp).Default(defaultValue).Action(collectorFlagAction(name)).Bool()
85+
collectorState[name] = flag
86+
87+
// Register the create function for this collector
88+
factories[name] = createFunc
89+
}
90+
5391
type ElasticsearchCollector struct {
5492
Collectors map[string]Collector
5593
logger log.Logger
94+
esURL *url.URL
95+
httpClient *http.Client
5696
}
5797

98+
type Option func(*ElasticsearchCollector) error
99+
58100
// NewElasticsearchCollector creates a new ElasticsearchCollector
59-
func NewElasticsearchCollector(logger log.Logger, httpClient *http.Client, esURL *url.URL) (*ElasticsearchCollector, error) {
101+
func NewElasticsearchCollector(logger log.Logger, filters []string, options ...Option) (*ElasticsearchCollector, error) {
102+
e := &ElasticsearchCollector{logger: logger}
103+
// Apply options to customize the collector
104+
for _, o := range options {
105+
if err := o(e); err != nil {
106+
return nil, err
107+
}
108+
}
109+
110+
f := make(map[string]bool)
111+
for _, filter := range filters {
112+
enabled, exist := collectorState[filter]
113+
if !exist {
114+
return nil, fmt.Errorf("missing collector: %s", filter)
115+
}
116+
if !*enabled {
117+
return nil, fmt.Errorf("disabled collector: %s", filter)
118+
}
119+
f[filter] = true
120+
}
60121
collectors := make(map[string]Collector)
122+
initiatedCollectorsMtx.Lock()
123+
defer initiatedCollectorsMtx.Unlock()
124+
for key, enabled := range collectorState {
125+
if !*enabled || (len(f) > 0 && !f[key]) {
126+
continue
127+
}
128+
if collector, ok := initiatedCollectors[key]; ok {
129+
collectors[key] = collector
130+
} else {
131+
collector, err := factories[key](log.With(logger, "collector", key), e.esURL, e.httpClient)
132+
if err != nil {
133+
return nil, err
134+
}
135+
collectors[key] = collector
136+
initiatedCollectors[key] = collector
137+
}
138+
}
139+
140+
e.Collectors = collectors
141+
142+
return e, nil
143+
}
61144

62-
return &ElasticsearchCollector{Collectors: collectors, logger: logger}, nil
145+
func WithElasticsearchURL(esURL *url.URL) Option {
146+
return func(e *ElasticsearchCollector) error {
147+
e.esURL = esURL
148+
return nil
149+
}
150+
}
151+
152+
func WithHTTPClient(hc *http.Client) Option {
153+
return func(e *ElasticsearchCollector) error {
154+
e.httpClient = hc
155+
return nil
156+
}
63157
}
64158

65159
// Describe implements the prometheus.Collector interface.
@@ -89,7 +183,11 @@ func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus
89183
var success float64
90184

91185
if err != nil {
92-
_ = level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
186+
if IsNoDataError(err) {
187+
_ = level.Debug(logger).Log("msg", "collector returned no data", "name", name, "duration_seconds", duration.Seconds(), "err", err)
188+
} else {
189+
_ = level.Error(logger).Log("msg", "collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
190+
}
93191
success = 0
94192
} else {
95193
_ = level.Debug(logger).Log("msg", "collector succeeded", "name", name, "duration_seconds", duration.Seconds())
@@ -98,3 +196,22 @@ func execute(ctx context.Context, name string, c Collector, ch chan<- prometheus
98196
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)
99197
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name)
100198
}
199+
200+
// collectorFlagAction generates a new action function for the given collector
201+
// to track whether it has been explicitly enabled or disabled from the command line.
202+
// A new action function is needed for each collector flag because the ParseContext
203+
// does not contain information about which flag called the action.
204+
// See: https://github.com/alecthomas/kingpin/issues/294
205+
func collectorFlagAction(collector string) func(ctx *kingpin.ParseContext) error {
206+
return func(ctx *kingpin.ParseContext) error {
207+
forcedCollectors[collector] = true
208+
return nil
209+
}
210+
}
211+
212+
// ErrNoData indicates the collector found no data to collect, but had no other error.
213+
var ErrNoData = errors.New("collector returned no data")
214+
215+
func IsNoDataError(err error) bool {
216+
return err == ErrNoData
217+
}

main.go

+14
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,20 @@ func main() {
160160
// version metric
161161
prometheus.MustRegister(version.NewCollector(name))
162162

163+
// create the exporter
164+
exporter, err := collector.NewElasticsearchCollector(
165+
logger,
166+
[]string{},
167+
collector.WithElasticsearchURL(esURL),
168+
collector.WithHTTPClient(httpClient),
169+
)
170+
if err != nil {
171+
_ = level.Error(logger).Log("msg", "failed to create Elasticsearch collector", "err", err)
172+
os.Exit(1)
173+
}
174+
prometheus.MustRegister(exporter)
175+
176+
// TODO(@sysadmind): Remove this when we have a better way to get the cluster name to down stream collectors.
163177
// cluster info retriever
164178
clusterInfoRetriever := clusterinfo.New(logger, httpClient, esURL, *esClusterInfoInterval)
165179

0 commit comments

Comments
 (0)