Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ type Info struct {
ClusterID string `json:"cluster_uuid"`
}

// NodeInfo struct cotains data about the node
type NodeInfo struct {
Host string `json:"host"`
TransportAddress string `json:"transport_address"`
IP string `json:"ip"`
Name string `json:"name"`
}

// GetClusterID fetches cluster id for given nodeID
func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) {
// Check if cluster id already cached. If yes, return it.
if clusterID, ok := clusterIDCache[nodeID]; ok {
return clusterID, nil
}

// Makes sure the http uri is reset to its inital value
defer http.SetURI(uri)

info, err := GetInfo(http, uri)
if err != nil {
return "", err
Expand All @@ -44,8 +49,6 @@ func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error)
//
// The two names are compared
func IsMaster(http *helper.HTTP, uri string) (bool, error) {
// Makes sure the http uri is reset to its inital value
defer http.SetURI(uri)

node, err := getNodeName(http, uri)
if err != nil {
Expand Down Expand Up @@ -97,6 +100,7 @@ func getMasterName(http *helper.HTTP, uri string) (string, error) {

// GetInfo returns the data for the Elasticsearch / endpoint
func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
defer http.SetURI(uri)

// Parses the uri to replace the path
u, _ := url.Parse(uri)
Expand All @@ -116,6 +120,8 @@ func GetInfo(http *helper.HTTP, uri string) (*Info, error) {
}

func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
defer http.SetURI(uri)

// Parses the uri to replace the path
u, _ := url.Parse(uri)
u.Path = path
Expand All @@ -124,3 +130,26 @@ func fetchPath(http *helper.HTTP, uri, path string) ([]byte, error) {
http.SetURI(u.String())
return http.FetchContent()
}

// GetNodeInfo returns the node information
func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) {

content, err := fetchPath(http, uri, "/_nodes/_local/nodes")
if err != nil {
return nil, err
}

nodesStruct := struct {
Nodes map[string]*NodeInfo `json:"nodes"`
}{}

json.Unmarshal(content, &nodesStruct)

// _local will only fetch one node info. First entry is node name
for k, v := range nodesStruct.Nodes {
if k == nodeID {
return v, nil
}
}
return nil, fmt.Errorf("no node matched id %s", nodeID)
}
12 changes: 7 additions & 5 deletions metricbeat/module/elasticsearch/shard/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ import (

var (
schema = s.Schema{
"state": c.Str("state"),
"primary": c.Bool("primary"),
"node": c.Str("node"),
"index": c.Str("index"),
"shard": c.Int("number"),
"state": c.Str("state"),
"primary": c.Bool("primary"),
"node": c.Str("node"),
"index": c.Str("index"),
"shard": c.Int("number"),
"relocating_node": c.Str("relocating_node"),
}
)

type stateStruct struct {
ClusterName string `json:"cluster_name"`
StateID string `json:"state_uuid"`
MasterNode string `json:"master_node"`
RoutingTable struct {
Indices map[string]struct {
Shards map[string][]map[string]interface{} `json:"shards"`
Expand Down
71 changes: 71 additions & 0 deletions metricbeat/module/elasticsearch/shard/data_xpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package shard

import (
"encoding/json"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) {
stateData := &stateStruct{}
err := json.Unmarshal(content, stateData)
if err != nil {
r.Error(err)
return
}

nodeInfo, err := elasticsearch.GetNodeInfo(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
if err != nil {
r.Error(err)
return
}

// TODO: This is currently needed because the cluser_uuid is `na` in stateData in case not the full state is requested.
// Will be fixed in: https://github.com/elastic/elasticsearch/pull/30656
clusterID, err := elasticsearch.GetClusterID(m.HTTP, m.HostData().SanitizedURI+statePath, stateData.MasterNode)
if err != nil {
r.Error(err)
return
}

sourceNode := common.MapStr{
"uuid": stateData.MasterNode,
"host": nodeInfo.Host,
"transport_address": nodeInfo.TransportAddress,
"ip": nodeInfo.IP,
// This seems to be in the x-pack data a subset of the cluster_uuid not the name?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pickypg Should this be a subest of the cluster_uuid?

"name": stateData.ClusterName,
"timestamp": common.Time(time.Now()),
}

for _, index := range stateData.RoutingTable.Indices {
for _, shards := range index.Shards {
for _, shard := range shards {
event := mb.Event{}
fields, _ := schema.Apply(shard)

fields["shard"] = fields["number"]
delete(fields, "number")

event.RootFields = common.MapStr{}

event.RootFields = common.MapStr{
"timestamp": time.Now(),
"cluster_uuid": clusterID,
"interval_ms": m.Module().Config().Period.Nanoseconds() / 1000 / 1000,
"type": "shards",
"source_node": sourceNode,
"shard": fields,
"state_uuid": stateData.StateID,
}
event.Index = ".monitoring-es-6-mb"

r.Event(event)

}
}
}
}
34 changes: 15 additions & 19 deletions metricbeat/module/elasticsearch/shard/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,51 @@ package shard

import (
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/elasticsearch"
)

func init() {
mb.Registry.MustAddMetricSet("elasticsearch", "shard", New,
mb.WithHostParser(hostParser),
mb.WithHostParser(elasticsearch.HostParser),
mb.DefaultMetricSet(),
mb.WithNamespace("elasticsearch.shard"),
)
}

var (
hostParser = parse.URLHostParserBuilder{
DefaultScheme: "http",
PathConfigKey: "path",
// Get the stats from the local node
DefaultPath: "_cluster/state/version,master_node,routing_table",
}.Build()
const (
// Get the stats from the local node
statePath = "/_cluster/state/version,master_node,routing_table"
)

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
*elasticsearch.MetricSet
}

// New create a new instance of the MetricSet
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
cfgwarn.Beta("The elasticsearch shard metricset is beta")

http, err := helper.NewHTTP(base)
// Get the stats from the local node
ms, err := elasticsearch.NewMetricSet(base, statePath)
if err != nil {
return nil, err
}
return &MetricSet{
BaseMetricSet: base,
http: http,
}, nil
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right format
func (m *MetricSet) Fetch(r mb.ReporterV2) {
content, err := m.http.FetchContent()
content, err := m.HTTP.FetchContent()
if err != nil {
r.Error(err)
return
}

eventsMapping(r, content)
if m.XPack {
eventsMappingXPack(r, m, content)
} else {
eventsMapping(r, content)
}
}