Skip to content

Commit 95ab81e

Browse files
ruflinjsoriano
authored andcommitted
Add x-pack data for Elasticsearch shard metricset (#7097)
* Introduces GetNodeInfo method to fetch additional info about the node. This should become obsolete in the future. * Refactor shard metricset to use module level hostParser and metricset. The xpack feature works but will need further testing with new builds of Elasticsearch. The plan is to test all xpack metricsets together when they are all done and do further tweaks.
1 parent 16015b5 commit 95ab81e

File tree

4 files changed

+127
-29
lines changed

4 files changed

+127
-29
lines changed

metricbeat/module/elasticsearch/elasticsearch.go

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,21 @@ type Info struct {
1717
ClusterID string `json:"cluster_uuid"`
1818
}
1919

20+
// NodeInfo struct cotains data about the node
21+
type NodeInfo struct {
22+
Host string `json:"host"`
23+
TransportAddress string `json:"transport_address"`
24+
IP string `json:"ip"`
25+
Name string `json:"name"`
26+
}
27+
2028
// GetClusterID fetches cluster id for given nodeID
2129
func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) {
2230
// Check if cluster id already cached. If yes, return it.
2331
if clusterID, ok := clusterIDCache[nodeID]; ok {
2432
return clusterID, nil
2533
}
2634

27-
// Makes sure the http uri is reset to its inital value
28-
defer http.SetURI(uri)
29-
3035
info, err := GetInfo(http, uri)
3136
if err != nil {
3237
return "", err
@@ -44,8 +49,6 @@ func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error)
4449
//
4550
// The two names are compared
4651
func IsMaster(http *helper.HTTP, uri string) (bool, error) {
47-
// Makes sure the http uri is reset to its inital value
48-
defer http.SetURI(uri)
4952

5053
node, err := getNodeName(http, uri)
5154
if err != nil {
@@ -97,6 +100,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {
97100

98101
// GetInfo returns the data for the Elasticsearch / endpoint
99102
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
103+
defer http.SetURI(uri)
100104

101105
// Parses the uri to replace the path
102106
u, _ := url.Parse(uri)
@@ -116,6 +120,8 @@ func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
116120
}
117121

118122
func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
123+
defer http.SetURI(uri)
124+
119125
// Parses the uri to replace the path
120126
u, _ := url.Parse(uri)
121127
u.Path = path
@@ -124,3 +130,26 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
124130
http.SetURI(u.String())
125131
return http.FetchContent()
126132
}
133+
134+
// GetNodeInfo returns the node information
135+
func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) {
136+
137+
content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
nodesStruct := struct {
143+
Nodes map[string]*NodeInfo `json:"nodes"`
144+
}{}
145+
146+
json.Unmarshal(content, &nodesStruct)
147+
148+
// _local will only fetch one node info. First entry is node name
149+
for k, v := range nodesStruct.Nodes {
150+
if k == nodeID {
151+
return v, nil
152+
}
153+
}
154+
return nil, fmt.Errorf("no node matched id %s", nodeID)
155+
}

metricbeat/module/elasticsearch/shard/data.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,19 @@ import (
1111

1212
var (
1313
schema = s.Schema{
14-
"state": c.Str("state"),
15-
"primary": c.Bool("primary"),
16-
"node": c.Str("node"),
17-
"index": c.Str("index"),
18-
"shard": c.Int("number"),
14+
"state": c.Str("state"),
15+
"primary": c.Bool("primary"),
16+
"node": c.Str("node"),
17+
"index": c.Str("index"),
18+
"shard": c.Int("number"),
19+
"relocating_node": c.Str("relocating_node"),
1920
}
2021
)
2122

2223
type stateStruct struct {
2324
ClusterName string `json:"cluster_name"`
2425
StateID string `json:"state_uuid"`
26+
MasterNode string `json:"master_node"`
2527
RoutingTable struct {
2628
Indices map[string]struct {
2729
Shards map[string][]map[string]interface{} `json:"shards"`
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package shard
2+
3+
import (
4+
"encoding/json"
5+
"time"
6+
7+
"github.com/elastic/beats/libbeat/common"
8+
"github.com/elastic/beats/metricbeat/mb"
9+
"github.com/elastic/beats/metricbeat/module/elasticsearch"
10+
)
11+
12+
func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
13+
stateData := &stateStruct{}
14+
err := json.Unmarshal(content, stateData)
15+
if err != nil {
16+
r.Error(err)
17+
return
18+
}
19+
20+
nodeInfo, err := elasticsearch.GetNodeInfo(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
21+
if err != nil {
22+
r.Error(err)
23+
return
24+
}
25+
26+
// TODO: This is currently needed because the cluser_uuid is `na` in stateData in case not the full state is requested.
27+
// Will be fixed in: https://github.com/elastic/elasticsearch/pull/30656
28+
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
29+
if err != nil {
30+
r.Error(err)
31+
return
32+
}
33+
34+
sourceNode := common.MapStr{
35+
"uuid": stateData.MasterNode,
36+
"host": nodeInfo.Host,
37+
"transport_address": nodeInfo.TransportAddress,
38+
"ip": nodeInfo.IP,
39+
// This seems to be in the x-pack data a subset of the cluster_uuid not the name?
40+
"name": stateData.ClusterName,
41+
"timestamp": common.Time(time.Now()),
42+
}
43+
44+
for _, index := range stateData.RoutingTable.Indices {
45+
for _, shards := range index.Shards {
46+
for _, shard := range shards {
47+
event := mb.Event{}
48+
fields, _ := schema.Apply(shard)
49+
50+
fields["shard"] = fields["number"]
51+
delete(fields, "number")
52+
53+
event.RootFields = common.MapStr{}
54+
55+
event.RootFields = common.MapStr{
56+
"timestamp": time.Now(),
57+
"cluster_uuid": clusterID,
58+
"interval_ms": m.Module().Config().Period.Nanoseconds() / 1000 / 1000,
59+
"type": "shards",
60+
"source_node": sourceNode,
61+
"shard": fields,
62+
"state_uuid": stateData.StateID,
63+
}
64+
event.Index = ".monitoring-es-6-mb"
65+
66+
r.Event(event)
67+
68+
}
69+
}
70+
}
71+
}

metricbeat/module/elasticsearch/shard/shard.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,51 @@ package shard
22

33
import (
44
"github.com/elastic/beats/libbeat/common/cfgwarn"
5-
"github.com/elastic/beats/metricbeat/helper"
65
"github.com/elastic/beats/metricbeat/mb"
7-
"github.com/elastic/beats/metricbeat/mb/parse"
6+
"github.com/elastic/beats/metricbeat/module/elasticsearch"
87
)
98

109
func init() {
1110
mb.Registry.MustAddMetricSet("elasticsearch", "shard", New,
12-
mb.WithHostParser(hostParser),
11+
mb.WithHostParser(elasticsearch.HostParser),
1312
mb.DefaultMetricSet(),
1413
mb.WithNamespace("elasticsearch.shard"),
1514
)
1615
}
1716

18-
var (
19-
hostParser = parse.URLHostParserBuilder{
20-
DefaultScheme: "http",
21-
PathConfigKey: "path",
22-
// Get the stats from the local node
23-
DefaultPath: "_cluster/state/version,master_node,routing_table",
24-
}.Build()
17+
const (
18+
// Get the stats from the local node
19+
statePath = "/_cluster/state/version,master_node,routing_table"
2520
)
2621

2722
// MetricSet type defines all fields of the MetricSet
2823
type MetricSet struct {
29-
mb.BaseMetricSet
30-
http *helper.HTTP
24+
*elasticsearch.MetricSet
3125
}
3226

3327
// New create a new instance of the MetricSet
3428
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
3529
cfgwarn.Beta("The elasticsearch shard metricset is beta")
3630

37-
http, err := helper.NewHTTP(base)
31+
// Get the stats from the local node
32+
ms, err := elasticsearch.NewMetricSet(base, statePath)
3833
if err != nil {
3934
return nil, err
4035
}
41-
return &MetricSet{
42-
BaseMetricSet: base,
43-
http: http,
44-
}, nil
36+
return &MetricSet{MetricSet: ms}, nil
4537
}
4638

4739
// Fetch methods implements the data gathering and data conversion to the right format
4840
func (m *MetricSet) Fetch(r mb.ReporterV2) {
49-
content, err := m.http.FetchContent()
41+
content, err := m.HTTP.FetchContent()
5042
if err != nil {
5143
r.Error(err)
5244
return
5345
}
5446

55-
eventsMapping(r, content)
47+
if m.XPack {
48+
eventsMappingXPack(r, m, content)
49+
} else {
50+
eventsMapping(r, content)
51+
}
5652
}

0 commit comments

Comments
 (0)