From c982af2622fd7a354d7875c4a37ec3abad42b735 Mon Sep 17 00:00:00 2001 From: Aaron Delaney Date: Thu, 28 Sep 2023 11:39:12 +0100 Subject: [PATCH] collector: use collector interface for tasks Signed-off-by: Aaron Delaney --- collector/cluster_settings_test.go | 13 --- collector/collector_test.go | 36 ++++++ collector/tasks.go | 173 ++++++++++++----------------- collector/tasks_response.go | 39 ------- collector/tasks_test.go | 77 ++++++------- main.go | 10 -- 6 files changed, 148 insertions(+), 200 deletions(-) create mode 100644 collector/collector_test.go delete mode 100644 collector/tasks_response.go diff --git a/collector/cluster_settings_test.go b/collector/cluster_settings_test.go index b2134318..bfe6653c 100644 --- a/collector/cluster_settings_test.go +++ b/collector/cluster_settings_test.go @@ -14,7 +14,6 @@ package collector import ( - "context" "io" "net/http" "net/http/httptest" @@ -24,21 +23,9 @@ import ( "testing" "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" ) -type wrapCollector struct { - c Collector -} - -func (w wrapCollector) Describe(ch chan<- *prometheus.Desc) { -} - -func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { - w.c.Update(context.Background(), ch) -} - func TestClusterSettingsStats(t *testing.T) { // Testcases created using: // docker run -d -p 9200:9200 elasticsearch:VERSION-alpine diff --git a/collector/collector_test.go b/collector/collector_test.go new file mode 100644 index 00000000..80c7fa5d --- /dev/null +++ b/collector/collector_test.go @@ -0,0 +1,36 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" +) + +// wrapCollector is a util to let you test your Collector implementation. +// +// Use this with prometheus/client_golang/prometheus/testutil to test metric output, for example: +// +// testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want)) +type wrapCollector struct { + c Collector +} + +func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) { +} + +func (w wrapCollector) Collect(ch chan<- prometheus.Metric) { + w.c.Update(context.Background(), ch) +} diff --git a/collector/tasks.go b/collector/tasks.go index 5f14295a..85518a92 100644 --- a/collector/tasks.go +++ b/collector/tasks.go @@ -14,97 +14,80 @@ package collector import ( + "context" "encoding/json" "fmt" "io" "net/http" "net/url" - "path" + "github.com/alecthomas/kingpin/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" ) -type taskByAction struct { - Type prometheus.ValueType - Desc *prometheus.Desc - Value func(action string, count int64) float64 - Labels func(action string, count int64) []string -} - -var ( - taskLabels = []string{"cluster", "action"} -) +// filterByTask global required because collector interface doesn't expose any way to take +// constructor args. +var actionFilter string -// Task Information Struct -type Task struct { - logger log.Logger - client *http.Client - url *url.URL - actions string +var taskActionDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, "task_stats", "action_total"), + "Number of tasks of a certain action", + []string{"action"}, nil) - up prometheus.Gauge - totalScrapes, jsonParseFailures prometheus.Counter +func init() { + kingpin.Flag("tasks.actions", + "Filter on task actions. Used in same way as Task API actions param"). + Default("indices:*").StringVar(&actionFilter) + registerCollector("tasks", defaultDisabled, NewTaskCollector) +} - byActionMetrics []*taskByAction +// Task Information Struct +type TaskCollector struct { + logger log.Logger + hc *http.Client + u *url.URL } -// NewTask defines Task Prometheus metrics -func NewTask(logger log.Logger, client *http.Client, url *url.URL, actions string) *Task { - return &Task{ - logger: logger, - client: client, - url: url, - actions: actions, - - up: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: prometheus.BuildFQName(namespace, "task_stats", "up"), - Help: "Was the last scrape of the ElasticSearch Task endpoint successful.", - }), - totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{ - Name: prometheus.BuildFQName(namespace, "task_stats", "total_scrapes"), - Help: "Current total Elasticsearch snapshots scrapes.", - }), - jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{ - Name: prometheus.BuildFQName(namespace, "task_stats", "json_parse_failures"), - Help: "Number of errors while parsing JSON.", - }), - byActionMetrics: []*taskByAction{ - { - Type: prometheus.GaugeValue, - Desc: prometheus.NewDesc( - prometheus.BuildFQName(namespace, "task_stats", "action_total"), - "Number of tasks of a certain action", - []string{"action"}, nil, - ), - Value: func(action string, count int64) float64 { - return float64(count) - }, - Labels: func(action string, count int64) []string { - return []string{action} - }, - }, - }, - } +// NewTaskCollector defines Task Prometheus metrics +func NewTaskCollector(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) { + level.Info(logger).Log("msg", "task collector created", + "actionFilter", actionFilter, + ) + + return &TaskCollector{ + logger: logger, + hc: hc, + u: u, + }, nil } -// Describe adds Task metrics descriptions -func (t *Task) Describe(ch chan<- *prometheus.Desc) { - for _, metric := range t.byActionMetrics { - ch <- metric.Desc +func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error { + stats, err := t.fetchAndDecodeAndAggregateTaskStats() + if err != nil { + err = fmt.Errorf("failed to fetch and decode task stats: %w", err) + return err } - - ch <- t.up.Desc() - ch <- t.totalScrapes.Desc() - ch <- t.jsonParseFailures.Desc() + for action, count := range stats.CountByAction { + ch <- prometheus.MustNewConstMetric( + taskActionDesc, + prometheus.GaugeValue, + float64(count), + action, + ) + } + return nil } -func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) { - u := *t.url - u.Path = path.Join(u.Path, "/_tasks") - u.RawQuery = "group_by=none&actions=" + t.actions - res, err := t.client.Get(u.String()) +func (t *TaskCollector) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) { + u := t.u.ResolveReference(&url.URL{Path: "_tasks"}) + q := u.Query() + q.Set("group_by", "none") + q.Set("actions", actionFilter) + u.RawQuery = q.Encode() + + res, err := t.hc.Get(u.String()) if err != nil { return nil, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s", u.Scheme, u.Hostname(), u.Port(), u.Path, err) @@ -126,13 +109,11 @@ func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, erro bts, err := io.ReadAll(res.Body) if err != nil { - t.jsonParseFailures.Inc() return nil, err } var tr TasksResponse if err := json.Unmarshal(bts, &tr); err != nil { - t.jsonParseFailures.Inc() return nil, err } @@ -140,35 +121,27 @@ func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, erro return stats, nil } -// Collect gets Task metric values -func (ds *Task) Collect(ch chan<- prometheus.Metric) { - ds.totalScrapes.Inc() - defer func() { - ch <- ds.up - ch <- ds.totalScrapes - ch <- ds.jsonParseFailures - }() +// TasksResponse is a representation of the Task management API. +type TasksResponse struct { + Tasks []TaskResponse `json:"tasks"` +} - stats, err := ds.fetchAndDecodeAndAggregateTaskStats() - if err != nil { - ds.up.Set(0) - level.Warn(ds.logger).Log( - "msg", "failed to fetch and decode task stats", - "err", err, - ) - return - } +// TaskResponse is a representation of the individual task item returned by task API endpoint. +// +// We only parse a very limited amount of this API for use in aggregation. +type TaskResponse struct { + Action string `json:"action"` +} - for action, count := range stats.CountByAction { - for _, metric := range ds.byActionMetrics { - ch <- prometheus.MustNewConstMetric( - metric.Desc, - metric.Type, - metric.Value(action, count), - metric.Labels(action, count)..., - ) - } - } +type AggregatedTaskStats struct { + CountByAction map[string]int64 +} - ds.up.Set(1) +func AggregateTasks(t TasksResponse) *AggregatedTaskStats { + actions := map[string]int64{} + for _, task := range t.Tasks { + actions[task.Action] += 1 + } + agg := &AggregatedTaskStats{CountByAction: actions} + return agg } diff --git a/collector/tasks_response.go b/collector/tasks_response.go deleted file mode 100644 index 3209cd17..00000000 --- a/collector/tasks_response.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2023 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package collector - -// TasksResponse is a representation of the Task management API. -type TasksResponse struct { - Tasks []TaskResponse `json:"tasks"` -} - -// TaskResponse is a representation of the individual task item returned by task API endpoint. -// -// We only parse a very limited amount of this API for use in aggregation. -type TaskResponse struct { - Action string `json:"action"` -} - -type AggregatedTaskStats struct { - CountByAction map[string]int64 -} - -func AggregateTasks(t TasksResponse) *AggregatedTaskStats { - actions := map[string]int64{} - for _, task := range t.Tasks { - actions[task.Action] += 1 - } - agg := &AggregatedTaskStats{CountByAction: actions} - return agg -} diff --git a/collector/tasks_test.go b/collector/tasks_test.go index 35f49339..0db500d3 100644 --- a/collector/tasks_test.go +++ b/collector/tasks_test.go @@ -18,60 +18,61 @@ import ( "net/http" "net/http/httptest" "net/url" + "strings" "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus/testutil" ) func TestTasks(t *testing.T) { // Test data was collected by running the following: + // # create container // docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.17.11 // sleep 15 - // # start some busy work - // for i in $(seq 1 1000); do \ - // curl -o /dev/null -s -X POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' \ - // -d'{"abc": "'$i'"}'; done & - // curl -X POST "localhost:9200/a1/_delete_by_query?requests_per_second=1&wait_for_completion=false" \ - // -H 'Content-Type: application/json' -d'{"query": {"match_all": {}}} + // # start some busy work in background + // for i in $(seq 1 500) + // do + // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a1": "'"$i"'"}' + // sleep .01 + // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a2": "'"$i"'"}' + // sleep .01 + // curl -o /dev/null -sX POST "localhost:9200/a1/_doc" -H 'Content-Type: application/json' -d'{"a3": "'"$i"'"}' + // sleep .01 + // done & // # try and collect a good sample // curl -X GET 'localhost:9200/_tasks?group_by=none&actions=indices:*' - // docker rm elasticsearch + // # cleanup + // docker rm --force elasticsearch tcs := map[string]string{ - "7.17": `{"tasks":[{"node":"NVe9ksxcSu6AJTKlIfI24A","id":17223,"type":"transport","action":"indices:data/write/delete/byquery","start_time_in_millis":1695214684290,"running_time_in_nanos":8003510219,"cancellable":true,"cancelled":false,"headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20890,"type":"transport","action":"indices:data/write/index","start_time_in_millis":1695214692292,"running_time_in_nanos":1611966,"cancellable":false,"headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20891,"type":"transport","action":"indices:data/write/bulk[s]","start_time_in_millis":1695214692292,"running_time_in_nanos":1467298,"cancellable":false,"parent_task_id":"NVe9ksxcSu6AJTKlIfI24A:20890","headers":{}},{"node":"NVe9ksxcSu6AJTKlIfI24A","id":20892,"type":"direct","action":"indices:data/write/bulk[s][p]","start_time_in_millis":1695214692292,"running_time_in_nanos":1437170,"cancellable":false,"parent_task_id":"NVe9ksxcSu6AJTKlIfI24A:20891","headers":{}}]}`, + "7.17": `{"tasks":[{"node":"9lWCm1y_QkujaAg75bVx7A","id":70,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464655,"running_time_in_nanos":308640039,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":73,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464683,"running_time_in_nanos":280672000,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":76,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464711,"running_time_in_nanos":253247906,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":93,"type":"transport","action":"indices:admin/index_template/put","start_time_in_millis":1695900464904,"running_time_in_nanos":60230460,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":50,"type":"transport","action":"indices:data/write/index","start_time_in_millis":1695900464229,"running_time_in_nanos":734480468,"cancellable":false,"headers":{}},{"node":"9lWCm1y_QkujaAg75bVx7A","id":51,"type":"transport","action":"indices:admin/auto_create","start_time_in_millis":1695900464235,"running_time_in_nanos":729223933,"cancellable":false,"headers":{}}]}`, } + want := `# HELP elasticsearch_task_stats_action_total Number of tasks of a certain action +# TYPE elasticsearch_task_stats_action_total gauge +elasticsearch_task_stats_action_total{action="indices:admin/auto_create"} 1 +elasticsearch_task_stats_action_total{action="indices:admin/index_template/put"} 4 +elasticsearch_task_stats_action_total{action="indices:data/write/index"} 1 +` for ver, out := range tcs { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - fmt.Fprintln(w, out) - })) - defer ts.Close() + t.Run(ver, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprintln(w, out) + })) + defer ts.Close() - u, err := url.Parse(ts.URL) - if err != nil { - t.Fatalf("Failed to parse URL: %s", err) - } + u, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("Failed to parse URL: %s", err) + } - task := NewTask(log.NewNopLogger(), http.DefaultClient, u, "indices:*") - stats, err := task.fetchAndDecodeAndAggregateTaskStats() - if err != nil { - t.Fatalf("Failed to fetch or decode data stream stats: %s", err) - } - t.Logf("[%s] Task Response: %+v", ver, stats) + c, err := NewTaskCollector(log.NewNopLogger(), u, ts.Client()) + if err != nil { + t.Fatalf("Failed to create collector: %v", err) + } - // validate actions aggregations - if len(stats.CountByAction) != 4 { - t.Fatal("expected to get 4 tasks") - } - if stats.CountByAction["indices:data/write/index"] != 1 { - t.Fatal("excpected action indices:data/write/delete/byquery to have count 1") - } - if stats.CountByAction["indices:data/write/bulk[s]"] != 1 { - t.Fatal("excpected action indices:data/write/bulk[s] to have count 1") - } - if stats.CountByAction["indices:data/write/bulk[s][p]"] != 1 { - t.Fatal("excpected action indices:data/write/bulk[s][p] to have count 1") - } - if stats.CountByAction["indices:data/write/delete/byquery"] != 1 { - t.Fatal("excpected action indices:data/write/delete/byquery to have count 1") - } + if err := testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want)); err != nil { + t.Fatalf("Metrics did not match: %v", err) + } + }) } } diff --git a/main.go b/main.go index 91d14bde..e50dd09e 100644 --- a/main.go +++ b/main.go @@ -86,12 +86,6 @@ func main() { esExportSnapshots = kingpin.Flag("es.snapshots", "Export stats for the cluster snapshots."). Default("false").Bool() - esExportTasks = kingpin.Flag("es.tasks", - "Aggregate stats for tasks in the cluster."). - Default("false").Bool() - esTaskActions = kingpin.Flag("es.tasks.actions", - "Filter on task actions. Used in same way as Task API actions param"). - Default("indices:*").String() esExportSLM = kingpin.Flag("es.slm", "Export stats for SLM snapshots."). Default("false").Bool() @@ -242,10 +236,6 @@ func main() { prometheus.MustRegister(collector.NewIlmIndicies(logger, httpClient, esURL)) } - if *esExportTasks { - prometheus.MustRegister(collector.NewTask(logger, httpClient, esURL, *esTaskActions)) - } - // Create a context that is cancelled on SIGKILL or SIGINT. ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer cancel()