Skip to content

Commit

Permalink
Split the exporter into multiple collectors (#65)
Browse files Browse the repository at this point in the history
* Delete all old .go files
* Add not to use gco in .promu.yml
* Add main.go, tls.go and create collector package with cluster_health.go
* Vendor go-kit dependencies
* Create first draft of nodes collector
* Move clusterHealthResponse into own file called cluster_health_response.go
* Move NodeStatsResponse into own file which is the old structs.go
* Add all metrics for indices per node
* Introduce nodeMetric{} that has 2 funcs to get values
Wrap promtheus.Desc to have it all in one place

* Add jvm_memory metrics to nodes collector
* Add gcCollectionMetrics, breakerMetrics, threadPoolMetrics & filesystemMetrics
* Add more missing counters to nodeMetrics
* Add missing process and transport node metrics
* Add indices_indexing & indices_merges subsystem metrics
* Add indices_refresh subsystem metrics
* Add missing elasticsearch_indices_store_throttle_time_seconds_total metric
* Update collector/cluster_health.go to use the same pattern as collector/nodes.go
* Iterate over all colors to create metric with each color as label for status
  • Loading branch information
metalmatze authored and dominikschulz committed Jun 28, 2017
1 parent 9944051 commit d37917b
Show file tree
Hide file tree
Showing 35 changed files with 4,521 additions and 661 deletions.
2 changes: 2 additions & 0 deletions .promu.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
go:
cgo: false
repository:
path: github.com/justwatchcom/elasticsearch_exporter
build:
Expand Down
243 changes: 243 additions & 0 deletions collector/cluster_health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package collector

import (
"encoding/json"
"net/http"
"net/url"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
)

const (
namespace = "elasticsearch"
)

var (
colors = []string{"green", "yellow", "red"}
defaultClusterHealthLabels = []string{"cluster"}
)

type clusterHealthMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(clusterHealth clusterHealthResponse) float64
}

type clusterHealthStatusMetric struct {
Type prometheus.ValueType
Desc *prometheus.Desc
Value func(clusterHealth clusterHealthResponse, color string) float64
Labels func(clusterName, color string) []string
}

type ClusterHealth struct {
logger log.Logger
client *http.Client
url *url.URL

metrics []*clusterHealthMetric
statusMetric *clusterHealthStatusMetric
}

func NewClusterHealth(logger log.Logger, client *http.Client, url *url.URL) *ClusterHealth {
subsystem := "cluster_health"

return &ClusterHealth{
logger: logger,
client: client,
url: url,

metrics: []*clusterHealthMetric{
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "active_primary_shards"),
"Tthe number of primary shards in your cluster. This is an aggregate total across all indices.",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.ActivePrimaryShards)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "active_shards"),
"Aggregate total of all shards across all indices, which includes replica shards.",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.ActiveShards)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "delayed_unassigned_shards"),
"XXX WHAT DOES THIS MEAN?",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.DelayedUnassignedShards)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "initializing_shards"),
"Count of shards that are being freshly created.",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.InitializingShards)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "number_of_data_nodes"),
"Number of data nodes in the cluster.",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.NumberOfDataNodes)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "number_of_in_flight_fetch"),
"The number of ongoing shard info requests.",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.NumberOfInFlightFetch)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "number_of_nodes"),
"Number of nodes in the cluster.",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.NumberOfNodes)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "number_of_pending_tasks"),
"XXX WHAT DOES THIS MEAN?",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.NumberOfPendingTasks)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "relocating_shards"),
"The number of shards that are currently moving from one node to another node.",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.RelocatingShards)
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "timed_out"),
"XXX WHAT DOES THIS MEAN?",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
if clusterHealth.TimedOut {
return 1
}
return 0
},
},
{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "unassigned_shards"),
"The number of shards that exist in the cluster state, but cannot be found in the cluster itself.",
defaultClusterHealthLabels, nil,
),
Value: func(clusterHealth clusterHealthResponse) float64 {
return float64(clusterHealth.UnassignedShards)
},
},
},
statusMetric: &clusterHealthStatusMetric{
Type: prometheus.GaugeValue,
Desc: prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "status"),
"Whether all primary and replica shards are allocated.",
[]string{"cluster", "color"}, nil,
),
Value: func(clusterHealth clusterHealthResponse, color string) float64 {
if clusterHealth.Status == color {
return 1
}
return 0
},
},
}
}

func (c *ClusterHealth) Describe(ch chan<- *prometheus.Desc) {
for _, metric := range c.metrics {
ch <- metric.Desc
}
ch <- c.statusMetric.Desc
}

func (c *ClusterHealth) Collect(ch chan<- prometheus.Metric) {
c.url.Path = "/_cluster/health"
res, err := c.client.Get(c.url.String())
if err != nil {
level.Warn(c.logger).Log(
"msg", "failed to get cluster health",
"url", c.url.String(),
"err", err,
)
return
}
defer res.Body.Close()

dec := json.NewDecoder(res.Body)

var clusterHealthResponse clusterHealthResponse
if err := dec.Decode(&clusterHealthResponse); err != nil {
level.Warn(c.logger).Log(
"msg", "failed to decode cluster health",
"err", err,
)
return
}

for _, metric := range c.metrics {
ch <- prometheus.MustNewConstMetric(
metric.Desc,
metric.Type,
metric.Value(clusterHealthResponse),
clusterHealthResponse.ClusterName,
)
}

for _, color := range colors {
ch <- prometheus.MustNewConstMetric(
c.statusMetric.Desc,
c.statusMetric.Type,
c.statusMetric.Value(clusterHealthResponse, color),
clusterHealthResponse.ClusterName, color,
)
}
}
19 changes: 19 additions & 0 deletions collector/cluster_health_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package collector

type clusterHealthResponse struct {
ClusterName string `json:"cluster_name"`
Status string `json:"status"`
TimedOut bool `json:"timed_out"`
NumberOfNodes int `json:"number_of_nodes"`
NumberOfDataNodes int `json:"number_of_data_nodes"`
ActivePrimaryShards int `json:"active_primary_shards"`
ActiveShards int `json:"active_shards"`
RelocatingShards int `json:"relocating_shards"`
InitializingShards int `json:"initializing_shards"`
UnassignedShards int `json:"unassigned_shards"`
DelayedUnassignedShards int `json:"delayed_unassigned_shards"`
NumberOfPendingTasks int `json:"number_of_pending_tasks"`
NumberOfInFlightFetch int `json:"number_of_in_flight_fetch"`
TaskMaxWaitingInQueueMillis int `json:"task_max_waiting_in_queue_millis"`
ActiveShardsPercentAsNumber float64 `json:"active_shards_percent_as_number"`
}
Loading

0 comments on commit d37917b

Please sign in to comment.