Skip to content

Commit

Permalink
collector: use collector interface for tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Delaney <[email protected]>
  • Loading branch information
devoxel committed Sep 28, 2023
1 parent dd0a9f3 commit c982af2
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 200 deletions.
13 changes: 0 additions & 13 deletions collector/cluster_settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package collector

import (
"context"
"io"
"net/http"
"net/http/httptest"
Expand All @@ -24,21 +23,9 @@ import (
"testing"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
)

type wrapCollector struct {
c Collector
}

func (w wrapCollector) Describe(ch chan<- *prometheus.Desc) {
}

func (w wrapCollector) Collect(ch chan<- prometheus.Metric) {
w.c.Update(context.Background(), ch)
}

func TestClusterSettingsStats(t *testing.T) {
// Testcases created using:
// docker run -d -p 9200:9200 elasticsearch:VERSION-alpine
Expand Down
36 changes: 36 additions & 0 deletions collector/collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"context"

"github.com/prometheus/client_golang/prometheus"
)

// wrapCollector is a util to let you test your Collector implementation.
//
// Use this with prometheus/client_golang/prometheus/testutil to test metric output, for example:
//
// testutil.CollectAndCompare(wrapCollector{c}, strings.NewReader(want))
type wrapCollector struct {
c Collector
}

func (w wrapCollector) Describe(_ chan<- *prometheus.Desc) {
}

func (w wrapCollector) Collect(ch chan<- prometheus.Metric) {
w.c.Update(context.Background(), ch)
}
173 changes: 73 additions & 100 deletions collector/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,97 +14,80 @@
package collector

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"path"

"github.com/alecthomas/kingpin/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)

type taskByAction struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(action string, count int64) float64
Labels func(action string, count int64) []string
}

var (
taskLabels = []string{"cluster", "action"}
)
// filterByTask global required because collector interface doesn't expose any way to take
// constructor args.
var actionFilter string

// Task Information Struct
type Task struct {
logger log.Logger
client *http.Client
url *url.URL
actions string
var taskActionDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "task_stats", "action_total"),
"Number of tasks of a certain action",
[]string{"action"}, nil)

up prometheus.Gauge
totalScrapes, jsonParseFailures prometheus.Counter
func init() {
kingpin.Flag("tasks.actions",
"Filter on task actions. Used in same way as Task API actions param").
Default("indices:*").StringVar(&actionFilter)
registerCollector("tasks", defaultDisabled, NewTaskCollector)
}

byActionMetrics []*taskByAction
// Task Information Struct
type TaskCollector struct {
logger log.Logger
hc *http.Client
u *url.URL
}

// NewTask defines Task Prometheus metrics
func NewTask(logger log.Logger, client *http.Client, url *url.URL, actions string) *Task {
return &Task{
logger: logger,
client: client,
url: url,
actions: actions,

up: prometheus.NewGauge(prometheus.GaugeOpts{
Name: prometheus.BuildFQName(namespace, "task_stats", "up"),
Help: "Was the last scrape of the ElasticSearch Task endpoint successful.",
}),
totalScrapes: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "task_stats", "total_scrapes"),
Help: "Current total Elasticsearch snapshots scrapes.",
}),
jsonParseFailures: prometheus.NewCounter(prometheus.CounterOpts{
Name: prometheus.BuildFQName(namespace, "task_stats", "json_parse_failures"),
Help: "Number of errors while parsing JSON.",
}),
byActionMetrics: []*taskByAction{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "task_stats", "action_total"),
"Number of tasks of a certain action",
[]string{"action"}, nil,
),
Value: func(action string, count int64) float64 {
return float64(count)
},
Labels: func(action string, count int64) []string {
return []string{action}
},
},
},
}
// NewTaskCollector defines Task Prometheus metrics
func NewTaskCollector(logger log.Logger, u *url.URL, hc *http.Client) (Collector, error) {
level.Info(logger).Log("msg", "task collector created",
"actionFilter", actionFilter,
)

return &TaskCollector{
logger: logger,
hc: hc,
u: u,
}, nil
}

// Describe adds Task metrics descriptions
func (t *Task) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range t.byActionMetrics {
ch <- metric.Desc
func (t *TaskCollector) Update(ctx context.Context, ch chan<- prometheus.Metric) error {
stats, err := t.fetchAndDecodeAndAggregateTaskStats()
if err != nil {
err = fmt.Errorf("failed to fetch and decode task stats: %w", err)
return err
}

ch <- t.up.Desc()
ch <- t.totalScrapes.Desc()
ch <- t.jsonParseFailures.Desc()
for action, count := range stats.CountByAction {
ch <- prometheus.MustNewConstMetric(
taskActionDesc,
prometheus.GaugeValue,
float64(count),
action,
)
}
return nil
}

func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) {
u := *t.url
u.Path = path.Join(u.Path, "/_tasks")
u.RawQuery = "group_by=none&actions=" + t.actions
res, err := t.client.Get(u.String())
func (t *TaskCollector) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, error) {
u := t.u.ResolveReference(&url.URL{Path: "_tasks"})
q := u.Query()
q.Set("group_by", "none")
q.Set("actions", actionFilter)
u.RawQuery = q.Encode()

res, err := t.hc.Get(u.String())
if err != nil {
return nil, fmt.Errorf("failed to get data stream stats health from %s://%s:%s%s: %s",
u.Scheme, u.Hostname(), u.Port(), u.Path, err)
Expand All @@ -126,49 +109,39 @@ func (t *Task) fetchAndDecodeAndAggregateTaskStats() (*AggregatedTaskStats, erro

bts, err := io.ReadAll(res.Body)
if err != nil {
t.jsonParseFailures.Inc()
return nil, err
}

var tr TasksResponse
if err := json.Unmarshal(bts, &tr); err != nil {
t.jsonParseFailures.Inc()
return nil, err
}

stats := AggregateTasks(tr)
return stats, nil
}

// Collect gets Task metric values
func (ds *Task) Collect(ch chan<- prometheus.Metric) {
ds.totalScrapes.Inc()
defer func() {
ch <- ds.up
ch <- ds.totalScrapes
ch <- ds.jsonParseFailures
}()
// TasksResponse is a representation of the Task management API.
type TasksResponse struct {
Tasks []TaskResponse `json:"tasks"`
}

stats, err := ds.fetchAndDecodeAndAggregateTaskStats()
if err != nil {
ds.up.Set(0)
level.Warn(ds.logger).Log(
"msg", "failed to fetch and decode task stats",
"err", err,
)
return
}
// TaskResponse is a representation of the individual task item returned by task API endpoint.
//
// We only parse a very limited amount of this API for use in aggregation.
type TaskResponse struct {
Action string `json:"action"`
}

for action, count := range stats.CountByAction {
for _, metric := range ds.byActionMetrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(action, count),
metric.Labels(action, count)...,
)
}
}
type AggregatedTaskStats struct {
CountByAction map[string]int64
}

ds.up.Set(1)
func AggregateTasks(t TasksResponse) *AggregatedTaskStats {
actions := map[string]int64{}
for _, task := range t.Tasks {
actions[task.Action] += 1
}
agg := &AggregatedTaskStats{CountByAction: actions}
return agg
}
39 changes: 0 additions & 39 deletions collector/tasks_response.go

This file was deleted.

Loading

0 comments on commit c982af2

Please sign in to comment.