From 5aebee4559127f9fe1961bc75ffbb66ed4ff7531 Mon Sep 17 00:00:00 2001 From: Jaime Yera Hidalgo <106755265+jaimeyh@users.noreply.github.com> Date: Fri, 14 Jun 2024 11:47:24 +0200 Subject: [PATCH] Revert "Refactor snapshots collector (#789)" This reverts commit 34423407fa453a35c3899b7d906dafd568efe359. --- CHANGELOG.md | 8 - collector/snapshots.go | 395 ++++++++++++++++++++++-------------- collector/snapshots_test.go | 13 +- collector/util.go | 57 ------ main.go | 7 + 5 files changed, 253 insertions(+), 227 deletions(-) delete mode 100644 collector/util.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d9af75a..1cb10b4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,3 @@ -## master / unreleased - -BREAKING CHANGES: - -The flag `--es.snapshots` has been renamed to `--collector.snapshots`. - -* [CHANGE] Rename --es.snapshots to --collector.snapshots #XXX - ## 1.6.0 / 2023-06-22 BREAKING CHANGES: diff --git a/collector/snapshots.go b/collector/snapshots.go index cb6a894a..9fc15216 100644 --- a/collector/snapshots.go +++ b/collector/snapshots.go @@ -14,17 +14,32 @@ package collector import ( - "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "path" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" ) +type snapshotMetric struct { + Type prometheus.ValueType + Desc *prometheus.Desc + Value func(snapshotStats SnapshotStatDataResponse) float64 + Labels func(repositoryName string, snapshotStats SnapshotStatDataResponse) []string +} + +type repositoryMetric struct { + Type prometheus.ValueType + Desc *prometheus.Desc + Value func(snapshotsStats SnapshotStatsResponse) float64 + Labels func(repositoryName string) []string +} + var ( defaultSnapshotLabels = []string{"repository", "state", "version"} defaultSnapshotLabelValues = func(repositoryName string, snapshotStats SnapshotStatDataResponse) []string { @@ -36,195 +51,261 @@ var ( } ) -var ( - numIndices = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_number_of_indices"), - "Number of indices in the last snapshot", - defaultSnapshotLabels, nil, - ) - snapshotStartTimestamp = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_start_time_timestamp"), - "Last snapshot start timestamp", - defaultSnapshotLabels, nil, - ) - snapshotEndTimestamp = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_end_time_timestamp"), - "Last snapshot end timestamp", - defaultSnapshotLabels, nil, - ) - snapshotNumFailures = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_number_of_failures"), - "Last snapshot number of failures", - defaultSnapshotLabels, nil, - ) - snapshotNumShards = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_total_shards"), - "Last snapshot total shards", - defaultSnapshotLabels, nil, - ) - snapshotFailedShards = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_failed_shards"), - "Last snapshot failed shards", - defaultSnapshotLabels, nil, - ) - snapshotSuccessfulShards = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_successful_shards"), - "Last snapshot successful shards", - defaultSnapshotLabels, nil, - ) - - numSnapshots = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "number_of_snapshots"), - "Number of snapshots in a repository", - defaultSnapshotRepositoryLabels, nil, - ) - oldestSnapshotTimestamp = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "oldest_snapshot_timestamp"), - "Timestamp of the oldest snapshot", - defaultSnapshotRepositoryLabels, nil, - ) - latestSnapshotTimestamp = prometheus.NewDesc( - prometheus.BuildFQName(namespace, "snapshot_stats", "latest_snapshot_timestamp_seconds"), - "Timestamp of the latest SUCCESS or PARTIAL snapshot", - defaultSnapshotRepositoryLabels, nil, - ) -) - -func init() { - registerCollector("snapshots", defaultDisabled, NewSnapshots) -} - // Snapshots information struct type Snapshots struct { logger log.Logger - hc *http.Client - u *url.URL + client *http.Client + url *url.URL + + snapshotMetrics []*snapshotMetric + repositoryMetrics []*repositoryMetric } // NewSnapshots defines Snapshots Prometheus metrics -func NewSnapshots(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { +func NewSnapshots(logger log.Logger, client *http.Client, url *url.URL) *Snapshots { return &Snapshots{ logger: logger, - u: u, - hc: hc, - }, nil + client: client, + url: url, + + snapshotMetrics: []*snapshotMetric{ + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_number_of_indices"), + "Number of indices in the last snapshot", + defaultSnapshotLabels, nil, + ), + Value: func(snapshotStats SnapshotStatDataResponse) float64 { + return float64(len(snapshotStats.Indices)) + }, + Labels: defaultSnapshotLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_start_time_timestamp"), + "Last snapshot start timestamp", + defaultSnapshotLabels, nil, + ), + Value: func(snapshotStats SnapshotStatDataResponse) float64 { + return float64(snapshotStats.StartTimeInMillis / 1000) + }, + Labels: defaultSnapshotLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_end_time_timestamp"), + "Last snapshot end timestamp", + defaultSnapshotLabels, nil, + ), + Value: func(snapshotStats SnapshotStatDataResponse) float64 { + return float64(snapshotStats.EndTimeInMillis / 1000) + }, + Labels: defaultSnapshotLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_number_of_failures"), + "Last snapshot number of failures", + defaultSnapshotLabels, nil, + ), + Value: func(snapshotStats SnapshotStatDataResponse) float64 { + return float64(len(snapshotStats.Failures)) + }, + Labels: defaultSnapshotLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_total_shards"), + "Last snapshot total shards", + defaultSnapshotLabels, nil, + ), + Value: func(snapshotStats SnapshotStatDataResponse) float64 { + return float64(snapshotStats.Shards.Total) + }, + Labels: defaultSnapshotLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_failed_shards"), + "Last snapshot failed shards", + defaultSnapshotLabels, nil, + ), + Value: func(snapshotStats SnapshotStatDataResponse) float64 { + return float64(snapshotStats.Shards.Failed) + }, + Labels: defaultSnapshotLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "snapshot_successful_shards"), + "Last snapshot successful shards", + defaultSnapshotLabels, nil, + ), + Value: func(snapshotStats SnapshotStatDataResponse) float64 { + return float64(snapshotStats.Shards.Successful) + }, + Labels: defaultSnapshotLabelValues, + }, + }, + repositoryMetrics: []*repositoryMetric{ + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "number_of_snapshots"), + "Number of snapshots in a repository", + defaultSnapshotRepositoryLabels, nil, + ), + Value: func(snapshotsStats SnapshotStatsResponse) float64 { + return float64(len(snapshotsStats.Snapshots)) + }, + Labels: defaultSnapshotRepositoryLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "oldest_snapshot_timestamp"), + "Timestamp of the oldest snapshot", + defaultSnapshotRepositoryLabels, nil, + ), + Value: func(snapshotsStats SnapshotStatsResponse) float64 { + if len(snapshotsStats.Snapshots) == 0 { + return 0 + } + return float64(snapshotsStats.Snapshots[0].StartTimeInMillis / 1000) + }, + Labels: defaultSnapshotRepositoryLabelValues, + }, + { + Type: prometheus.GaugeValue, + Desc: prometheus.NewDesc( + prometheus.BuildFQName(namespace, "snapshot_stats", "latest_snapshot_timestamp_seconds"), + "Timestamp of the latest SUCCESS or PARTIAL snapshot", + defaultSnapshotRepositoryLabels, nil, + ), + Value: func(snapshotsStats SnapshotStatsResponse) float64 { + for i := len(snapshotsStats.Snapshots) - 1; i >= 0; i-- { + var snap = snapshotsStats.Snapshots[i] + if snap.State == "SUCCESS" || snap.State == "PARTIAL" { + return float64(snap.StartTimeInMillis / 1000) + } + } + return 0 + }, + Labels: defaultSnapshotRepositoryLabelValues, + }, + }, + } } -func (c *Snapshots) Update(ctx context.Context, ch chan<- prometheus.Metric) error { - // indices - snapshotsStatsResp := make(map[string]SnapshotStatsResponse) - u := c.u.ResolveReference(&url.URL{Path: "/_snapshot"}) +// Describe add Snapshots metrics descriptions +func (s *Snapshots) Describe(ch chan<- *prometheus.Desc) { + for _, metric := range s.snapshotMetrics { + ch <- metric.Desc + } + for _, metric := range s.repositoryMetrics { + ch <- metric.Desc + } - var srr SnapshotRepositoriesResponse - resp, err := getURL(ctx, c.hc, c.logger, u.String()) +} + +func (s *Snapshots) getAndParseURL(u *url.URL, data interface{}) error { + res, err := s.client.Get(u.String()) if err != nil { - return err + return fmt.Errorf("failed to get from %s://%s:%s%s: %s", + u.Scheme, u.Hostname(), u.Port(), u.Path, err) + } + + defer func() { + err = res.Body.Close() + if err != nil { + level.Warn(s.logger).Log( + "msg", "failed to close http.Client", + "err", err, + ) + } + }() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("HTTP Request failed with code %d", res.StatusCode) } - err = json.Unmarshal(resp, &srr) + bts, err := io.ReadAll(res.Body) if err != nil { - return fmt.Errorf("failed to unmarshal JSON: %v", err) + return err } + if err := json.Unmarshal(bts, data); err != nil { + return err + } + return nil +} + +func (s *Snapshots) fetchAndDecodeSnapshotsStats() (map[string]SnapshotStatsResponse, error) { + mssr := make(map[string]SnapshotStatsResponse) + + u := *s.url + u.Path = path.Join(u.Path, "/_snapshot") + var srr SnapshotRepositoriesResponse + err := s.getAndParseURL(&u, &srr) + if err != nil { + return nil, err + } for repository := range srr { - pathPart := path.Join("/_snapshot", repository, "/_all") - u := c.u.ResolveReference(&url.URL{Path: pathPart}) + u := *s.url + u.Path = path.Join(u.Path, "/_snapshot", repository, "/_all") var ssr SnapshotStatsResponse - resp, err := getURL(ctx, c.hc, c.logger, u.String()) + err := s.getAndParseURL(&u, &ssr) if err != nil { continue } - err = json.Unmarshal(resp, &ssr) - if err != nil { - return fmt.Errorf("failed to unmarshal JSON: %v", err) - } - snapshotsStatsResp[repository] = ssr + mssr[repository] = ssr } - // Snapshots stats - for repositoryName, snapshotStats := range snapshotsStatsResp { + return mssr, nil +} - ch <- prometheus.MustNewConstMetric( - numSnapshots, - prometheus.GaugeValue, - float64(len(snapshotStats.Snapshots)), - defaultSnapshotRepositoryLabelValues(repositoryName)..., - ) +// Collect gets Snapshots metric values +func (s *Snapshots) Collect(ch chan<- prometheus.Metric) { - oldest := float64(0) - if len(snapshotStats.Snapshots) > 0 { - oldest = float64(snapshotStats.Snapshots[0].StartTimeInMillis / 1000) - } - ch <- prometheus.MustNewConstMetric( - oldestSnapshotTimestamp, - prometheus.GaugeValue, - oldest, - defaultSnapshotRepositoryLabelValues(repositoryName)..., + // indices + snapshotsStatsResp, err := s.fetchAndDecodeSnapshotsStats() + if err != nil { + level.Warn(s.logger).Log( + "msg", "failed to fetch and decode snapshot stats", + "err", err, ) + return + } - latest := float64(0) - for i := len(snapshotStats.Snapshots) - 1; i >= 0; i-- { - var snap = snapshotStats.Snapshots[i] - if snap.State == "SUCCESS" || snap.State == "PARTIAL" { - latest = float64(snap.StartTimeInMillis / 1000) - break - } + // Snapshots stats + for repositoryName, snapshotStats := range snapshotsStatsResp { + for _, metric := range s.repositoryMetrics { + ch <- prometheus.MustNewConstMetric( + metric.Desc, + metric.Type, + metric.Value(snapshotStats), + metric.Labels(repositoryName)..., + ) } - ch <- prometheus.MustNewConstMetric( - latestSnapshotTimestamp, - prometheus.GaugeValue, - latest, - defaultSnapshotRepositoryLabelValues(repositoryName)..., - ) - if len(snapshotStats.Snapshots) == 0 { continue } lastSnapshot := snapshotStats.Snapshots[len(snapshotStats.Snapshots)-1] - ch <- prometheus.MustNewConstMetric( - numIndices, - prometheus.GaugeValue, - float64(len(lastSnapshot.Indices)), - defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., - ) - ch <- prometheus.MustNewConstMetric( - snapshotStartTimestamp, - prometheus.GaugeValue, - float64(lastSnapshot.StartTimeInMillis/1000), - defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., - ) - ch <- prometheus.MustNewConstMetric( - snapshotEndTimestamp, - prometheus.GaugeValue, - float64(lastSnapshot.EndTimeInMillis/1000), - defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., - ) - ch <- prometheus.MustNewConstMetric( - snapshotNumFailures, - prometheus.GaugeValue, - float64(len(lastSnapshot.Failures)), - defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., - ) - ch <- prometheus.MustNewConstMetric( - snapshotNumShards, - prometheus.GaugeValue, - float64(lastSnapshot.Shards.Total), - defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., - ) - ch <- prometheus.MustNewConstMetric( - snapshotFailedShards, - prometheus.GaugeValue, - float64(lastSnapshot.Shards.Failed), - defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., - ) - ch <- prometheus.MustNewConstMetric( - snapshotSuccessfulShards, - prometheus.GaugeValue, - float64(lastSnapshot.Shards.Successful), - defaultSnapshotLabelValues(repositoryName, lastSnapshot)..., - ) + for _, metric := range s.snapshotMetrics { + ch <- prometheus.MustNewConstMetric( + metric.Desc, + metric.Type, + metric.Value(lastSnapshot), + metric.Labels(repositoryName, lastSnapshot)..., + ) + } } - - return nil } diff --git a/collector/snapshots_test.go b/collector/snapshots_test.go index a4f88b11..c097183e 100644 --- a/collector/snapshots_test.go +++ b/collector/snapshots_test.go @@ -209,12 +209,15 @@ func TestSnapshots(t *testing.T) { t.Fatal(err) } - c, err := NewSnapshots(log.NewNopLogger(), u, http.DefaultClient) - if err != nil { - t.Fatal(err) - } + s := NewSnapshots(log.NewNopLogger(), http.DefaultClient, u) + + // TODO: Convert to collector interface + // c, err := NewSnapshots(log.NewNopLogger(), u, http.DefaultClient) + // if err != nil { + // t.Fatal(err) + // } - if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(tt.want)); err != nil { + if err := testutil.CollectAndCompare(s, strings.NewReader(tt.want)); err != nil { t.Fatal(err) } }) diff --git a/collector/util.go b/collector/util.go deleted file mode 100644 index 19c045cd..00000000 --- a/collector/util.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2023 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -import ( - "context" - "fmt" - "io" - "net/http" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" -) - -func getURL(ctx context.Context, hc *http.Client, log log.Logger, u string) ([]byte, error) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) - if err != nil { - return nil, err - } - - resp, err := hc.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to get %s: %v", u, err) - } - - defer func() { - err = resp.Body.Close() - if err != nil { - level.Warn(log).Log( - "msg", "failed to close response body", - "err", err, - ) - } - }() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("HTTP Request failed with code %d", resp.StatusCode) - } - - b, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - return b, nil -} diff --git a/main.go b/main.go index c7db1def..e50dd09e 100644 --- a/main.go +++ b/main.go @@ -83,6 +83,9 @@ func main() { esExportShards = kingpin.Flag("es.shards", "Export stats for shards in the cluster (implies --es.indices)."). Default("false").Bool() + esExportSnapshots = kingpin.Flag("es.snapshots", + "Export stats for the cluster snapshots."). + Default("false").Bool() esExportSLM = kingpin.Flag("es.slm", "Export stats for SLM snapshots."). Default("false").Bool() @@ -208,6 +211,10 @@ func main() { } } + if *esExportSnapshots { + prometheus.MustRegister(collector.NewSnapshots(logger, httpClient, esURL)) + } + if *esExportSLM { prometheus.MustRegister(collector.NewSLM(logger, httpClient, esURL)) }