Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion metricbeat/helper/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ func MakeXPackMonitoringIndexName(product Product) string {
// ReportErrorForMissingField reports and returns an error message for the given
// field being missing in API response received from a given product
func ReportErrorForMissingField(field string, product Product, r mb.ReporterV2) error {
err := fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String()))
err := MakeErrorForMissingField(field, product)
r.Error(err)
return err
}

// MakeErrorForMissingField returns an error message for the given field being missing in an API
// response received from a given product
func MakeErrorForMissingField(field string, product Product) error {
return fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {

// Not master, no event sent
if !isMaster {
logp.Debug("elasticsearch", "Trying to fetch index recovery stats from a non master node.")
logp.Debug("elasticsearch", "Trying to fetch cluster stats from a non master node.")
return
}

Expand All @@ -73,5 +73,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

eventMapping(r, content)
if m.MetricSet.XPack {
eventMappingXPack(r, m, content)
} else {
eventMapping(r, content)
}
}
238 changes: 238 additions & 0 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cluster_stats

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use an underscore in package name


import (
"encoding/json"
"time"

"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/module/elasticsearch"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/mb"
)

var (
clusterStatsSchema = s.Schema{
"cluster_uuid": c.Str("cluster_uuid"),
"timestamp": c.Int("timestamp"),
"status": c.Str("status"),
"indices": c.Dict("indices", s.Schema{
"count": c.Int("count"),
"shards": c.Dict("shards", s.Schema{
"total": c.Int("total"),
"primaries": c.Int("primaries"),
"replication": c.Int("replication"),
"index": c.Dict("index", s.Schema{
"shards": c.Dict("shards", s.Schema{
"min": c.Int("min"),
"max": c.Int("max"),
"avg": c.Int("avg"),
}),
"primaries": c.Dict("primaries", s.Schema{
"min": c.Int("min"),
"max": c.Int("max"),
"avg": c.Int("avg"),
}),
"replication": c.Dict("replication", s.Schema{
"min": c.Int("min"),
"max": c.Int("max"),
"avg": c.Int("avg"),
}),
}),
}),
"docs": c.Dict("docs", s.Schema{
"count": c.Int("count"),
"deleted": c.Int("deleted"),
}),
"store": c.Dict("store", s.Schema{
"size_in_bytes": c.Int("size_in_bytes"),
}),
"fielddata": c.Dict("fielddata", s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"evictions": c.Int("evictions"),
}),
"query_cache": c.Dict("query_cache", s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"total_count": c.Int("total_count"),
"hit_count": c.Int("hit_count"),
"miss_count": c.Int("miss_count"),
"cache_size": c.Int("cache_size"),
"cache_count": c.Int("cache_count"),
"evictions": c.Int("evictions"),
}),
"completion": c.Dict("completion", s.Schema{
"size_in_bytes": c.Int("size_in_bytes"),
}),
"segments": c.Dict("segments", s.Schema{
"count": c.Int("count"),
"memory_in_bytes": c.Int("memory_in_bytes"),
"terms_memory_in_bytes": c.Int("terms_memory_in_bytes"),
"stored_fields_memory_in_bytes": c.Int("stored_fields_memory_in_bytes"),
"term_vectors_memory_in_bytes": c.Int("term_vectors_memory_in_bytes"),
"norms_memory_in_bytes": c.Int("norms_memory_in_bytes"),
"points_memory_in_bytes": c.Int("points_memory_in_bytes"),
"doc_values_memory_in_bytes": c.Int("doc_values_memory_in_bytes"),

"index_writer_memory_in_bytes": c.Int("index_writer_memory_in_bytes"),
"version_map_memory_in_bytes": c.Int("version_map_memory_in_bytes"),
"fixed_bit_set_memory_in_bytes": c.Int("fixed_bit_set_memory_in_bytes"),
"max_unsafe_auto_id_timestamp": c.Int("max_unsafe_auto_id_timestamp"),
}),
}),
"nodes": c.Dict("nodes", s.Schema{
"count": c.Dict("count", s.Schema{
"total": c.Int("total"),
"data": c.Int("data"),
"coordinating_only": c.Int("coordinating_only"),
"master": c.Int("master"),
"ingest": c.Int("ingest"),
}),
"os": c.Dict("os", s.Schema{
"available_processors": c.Int("available_processors"),
"allocated_processors": c.Int("allocated_processors"),
"mem": c.Dict("mem", s.Schema{
"total_in_bytes": c.Int("total_in_bytes"),
"free_in_bytes": c.Int("free_in_bytes"),
"used_in_bytes": c.Int("used_in_bytes"),
"free_percent": c.Int("free_percent"),
"used_percent": c.Int("used_percent"),
}),
}),
"process": c.Dict("process", s.Schema{
"cpu": c.Dict("cpu", s.Schema{
"percent": c.Int("percent"),
}),
"open_file_descriptors": c.Dict("open_file_descriptors", s.Schema{
"min": c.Int("min"),
"max": c.Int("max"),
"avg": c.Int("avg"),
}),
}),
"jvm": c.Dict("jvm", s.Schema{
"max_uptime_in_millis": c.Int("max_uptime_in_millis"),
"mem": c.Dict("mem", s.Schema{
"heap_used_in_bytes": c.Int("heap_used_in_bytes"),
"heap_max_in_bytes": c.Int("heap_max_in_bytes"),
}),
"threads": c.Int("threads"),
}),
"fs": c.Dict("fs", s.Schema{
"total_in_bytes": c.Int("total_in_bytes"),
"free_in_bytes": c.Int("free_in_bytes"),
"available_in_bytes": c.Int("available_in_bytes"),
}),
}),
}
)

func passthruField(fieldPath string, sourceData, targetData common.MapStr) error {
fieldValue, err := sourceData.GetValue(fieldPath)
if err != nil {
return elastic.MakeErrorForMissingField(fieldPath, elastic.Elasticsearch)
}

targetData.Put(fieldPath, fieldValue)
return nil
}

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return err
}

clusterStats, err := clusterStatsSchema.Apply(data)
if err != nil {
return err
}

dataMS := common.MapStr(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a type assertion even though this should never fail I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should I do this?

I tried changing this line to:

dataMS, ok := data.(common.MapStr)

but that gives me the compile-time error:

invalid type assertion: data.(common.MapStr) (non-interface type map[string]interface {} on left)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore my comment, above is good.


passthruFields := []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it seems these are values which outside x-pack reporting should be reported by the specific metricsets and to get all names for example it would be an aggregation based on the cluster id?

"indices.segments.file_sizes", // object with dynamic keys
"nodes.versions", // array of strings
"nodes.os.names", // array of objects
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a list of names : ["a", "b"] or nested objects [{"a":"b"}, {"c":"d"}]. Just curious.

Copy link
Contributor Author

@ycombinator ycombinator Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a list of objects, for example:

[
  {
    "name": "Mac OS X",
    "count": 1
  }
]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-(

"nodes.jvm.versions", // array of objects
"nodes.plugins", // array of objects
"nodes.network_types", // object with dynamic keys
}
for _, fieldPath := range passthruFields {
if err = passthruField(fieldPath, dataMS, clusterStats); err != nil {
return err
}
}

clusterName, ok := data["cluster_name"].(string)
if !ok {
return elastic.MakeErrorForMissingField("cluster_name", elastic.Elasticsearch)
}

info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

license, err := elasticsearch.GetLicense(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

// TODO: Inject `cluster_needs_tls` field under license object

clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

if err = passthruField("status", dataMS, clusterState); err != nil {
return err
}

// TODO: Compute and inject `node_hash` field under clusterState object

stackStats, err := elasticsearch.GetStackStats(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

// TODO: Inject `apm.found` field under stackStats object

event := mb.Event{}
event.RootFields = common.MapStr{
"cluster_uuid": info.ClusterID,
"cluster_name": clusterName,
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "cluster_stats",
"license": license,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the full license response reported here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is the response from the GET _xpack/license Elasticsearch API, which looks like this:

{
  "license": {
    "status": "active",
    "uid": "f80c1fb5-75e9-4536-be2d-a768b02abb46",
    "type": "basic",
    "issue_date": "2018-08-06T23:47:09.619Z",
    "issue_date_in_millis": 1533599229619,
    "max_nodes": 1000,
    "issued_to": "elasticsearch",
    "issuer": "elasticsearch",
    "start_date_in_millis": -1
  }
}

"version": info.Version.Number,
"cluster_stats": clusterStats,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin As you can see here, the document that is indexed into monitoring contains not only clusterStats but also clusterState (an abridged version of it) and stackStats.

For clusterState and stackStats we are simply calling the corresponding Elasticsearch APIs, doing a tiny bit of massaging of the response data, and then passing the resulting structure through over here. We are definitely not doing a complete schema conversion like we are doing for clusterStats.

It makes me wonder: should we treat clusterStats special over here? Or conversely, should we do complete schema conversions for clusterState and stackStats too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the internal reporting in ES happening? Is it just taking all the existing fields? If yes then we should do the same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing through cluster_stats as-is in 3b11529.

"cluster_state": clusterState,
"stack_stats": stackStats,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
r.Event(event)

return nil
}
39 changes: 39 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var clusterIDCache = map[string]string{}
type Info struct {
ClusterName string `json:"cluster_name"`
ClusterID string `json:"cluster_uuid"`
Version struct {
Number string `json:"number"`
} `json:"version"`
}

// NodeInfo struct cotains data about the node
Expand Down Expand Up @@ -167,3 +170,39 @@ func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error
}
return nil, fmt.Errorf("no node matched id %s", nodeID)
}

// GetLicense returns license information
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a note here that the license is cached?

func GetLicense(http *helper.HTTP, resetURI string) (map[string]interface{}, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we cache this value for a few minute as the license is hopefully very rarely going to change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Will do.

Copy link
Contributor Author

@ycombinator ycombinator Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in 5abe3b0. Curious to hear your thoughts on the implementation — too complex, too simple, just right for now?

content, err := fetchPath(http, resetURI, "_xpack/license")
if err != nil {
return nil, err
}

var license map[string]interface{}
err = json.Unmarshal(content, &license)
return license, err
}

// GetClusterState returns cluster state information
func GetClusterState(http *helper.HTTP, resetURI string) (map[string]interface{}, error) {
content, err := fetchPath(http, resetURI, "_cluster/state/version,master_node,nodes")
if err != nil {
return nil, err
}

var clusterState map[string]interface{}
err = json.Unmarshal(content, &clusterState)
return clusterState, err
}

// GetStackStats returns stack stats (usage) information
func GetStackStats(http *helper.HTTP, resetURI string) (map[string]interface{}, error) {
content, err := fetchPath(http, resetURI, "_xpack/usage")
if err != nil {
return nil, err
}

var stackStats map[string]interface{}
err = json.Unmarshal(content, &stackStats)
return stackStats, err
}