Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion metricbeat/helper/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ func MakeXPackMonitoringIndexName(product Product) string {
// ReportErrorForMissingField reports and returns an error message for the given
// field being missing in API response received from a given product
func ReportErrorForMissingField(field string, product Product, r mb.ReporterV2) error {
err := fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String()))
err := MakeErrorForMissingField(field, product)
r.Error(err)
return err
}

// MakeErrorForMissingField returns an error message for the given field being missing in an API
// response received from a given product
func MakeErrorForMissingField(field string, product Product) error {
return fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {

// Not master, no event sent
if !isMaster {
logp.Debug("elasticsearch", "Trying to fetch index recovery stats from a non master node.")
logp.Debug("elasticsearch", "Trying to fetch cluster stats from a non master node.")
return
}

Expand All @@ -73,5 +73,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

eventMapping(r, content)
if m.MetricSet.XPack {
eventMappingXPack(r, m, content)
} else {
eventMapping(r, content)
}
}
339 changes: 339 additions & 0 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cluster_stats

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use an underscore in package name


import (
"encoding/json"
"fmt"
"hash/fnv"
"sort"
"strings"
"time"

"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/module/elasticsearch"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/metricbeat/mb"
)

var (
clusterStatsSchema = s.Schema{
"cluster_uuid": c.Str("cluster_uuid"),
"timestamp": c.Int("timestamp"),
"status": c.Str("status"),
"indices": c.Dict("indices", s.Schema{
"count": c.Int("count"),
"shards": c.Dict("shards", s.Schema{
"total": c.Int("total"),
"primaries": c.Int("primaries"),
"replication": c.Int("replication"),
"index": c.Dict("index", s.Schema{
"shards": c.Dict("shards", s.Schema{
"min": c.Int("min"),
"max": c.Int("max"),
"avg": c.Int("avg"),
}),
"primaries": c.Dict("primaries", s.Schema{
"min": c.Int("min"),
"max": c.Int("max"),
"avg": c.Int("avg"),
}),
"replication": c.Dict("replication", s.Schema{
"min": c.Int("min"),
"max": c.Int("max"),
"avg": c.Int("avg"),
}),
}),
}),
"docs": c.Dict("docs", s.Schema{
"count": c.Int("count"),
"deleted": c.Int("deleted"),
}),
"store": c.Dict("store", s.Schema{
"size_in_bytes": c.Int("size_in_bytes"),
}),
"fielddata": c.Dict("fielddata", s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"evictions": c.Int("evictions"),
}),
"query_cache": c.Dict("query_cache", s.Schema{
"memory_size_in_bytes": c.Int("memory_size_in_bytes"),
"total_count": c.Int("total_count"),
"hit_count": c.Int("hit_count"),
"miss_count": c.Int("miss_count"),
"cache_size": c.Int("cache_size"),
"cache_count": c.Int("cache_count"),
"evictions": c.Int("evictions"),
}),
"completion": c.Dict("completion", s.Schema{
"size_in_bytes": c.Int("size_in_bytes"),
}),
"segments": c.Dict("segments", s.Schema{
"count": c.Int("count"),
"memory_in_bytes": c.Int("memory_in_bytes"),
"terms_memory_in_bytes": c.Int("terms_memory_in_bytes"),
"stored_fields_memory_in_bytes": c.Int("stored_fields_memory_in_bytes"),
"term_vectors_memory_in_bytes": c.Int("term_vectors_memory_in_bytes"),
"norms_memory_in_bytes": c.Int("norms_memory_in_bytes"),
"points_memory_in_bytes": c.Int("points_memory_in_bytes"),
"doc_values_memory_in_bytes": c.Int("doc_values_memory_in_bytes"),

"index_writer_memory_in_bytes": c.Int("index_writer_memory_in_bytes"),
"version_map_memory_in_bytes": c.Int("version_map_memory_in_bytes"),
"fixed_bit_set_memory_in_bytes": c.Int("fixed_bit_set_memory_in_bytes"),
"max_unsafe_auto_id_timestamp": c.Int("max_unsafe_auto_id_timestamp"),
}),
}),
"nodes": c.Dict("nodes", s.Schema{
"count": c.Dict("count", s.Schema{
"total": c.Int("total"),
"data": c.Int("data"),
"coordinating_only": c.Int("coordinating_only"),
"master": c.Int("master"),
"ingest": c.Int("ingest"),
}),
"os": c.Dict("os", s.Schema{
"available_processors": c.Int("available_processors"),
"allocated_processors": c.Int("allocated_processors"),
"mem": c.Dict("mem", s.Schema{
"total_in_bytes": c.Int("total_in_bytes"),
"free_in_bytes": c.Int("free_in_bytes"),
"used_in_bytes": c.Int("used_in_bytes"),
"free_percent": c.Int("free_percent"),
"used_percent": c.Int("used_percent"),
}),
}),
"process": c.Dict("process", s.Schema{
"cpu": c.Dict("cpu", s.Schema{
"percent": c.Int("percent"),
}),
"open_file_descriptors": c.Dict("open_file_descriptors", s.Schema{
"min": c.Int("min"),
"max": c.Int("max"),
"avg": c.Int("avg"),
}),
}),
"jvm": c.Dict("jvm", s.Schema{
"max_uptime_in_millis": c.Int("max_uptime_in_millis"),
"mem": c.Dict("mem", s.Schema{
"heap_used_in_bytes": c.Int("heap_used_in_bytes"),
"heap_max_in_bytes": c.Int("heap_max_in_bytes"),
}),
"threads": c.Int("threads"),
}),
"fs": c.Dict("fs", s.Schema{
"total_in_bytes": c.Int("total_in_bytes"),
"free_in_bytes": c.Int("free_in_bytes"),
"available_in_bytes": c.Int("available_in_bytes"),
}),
}),
}
)

func passthruField(fieldPath string, sourceData, targetData common.MapStr) error {
fieldValue, err := sourceData.GetValue(fieldPath)
if err != nil {
return elastic.MakeErrorForMissingField(fieldPath, elastic.Elasticsearch)
}

targetData.Put(fieldPath, fieldValue)
return nil
}

func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) {
// TLS does not need to be enabled if license type is something other than trial
value, err := license.GetValue("license.type")
if err != nil {
return false, err
}

licenseType, ok := value.(string)
if !ok {
return false, fmt.Errorf("License type is not a string")
}

if licenseType != "trial" {
return false, nil
}

// TLS does not need to be enabled if security is not enabled
value, err = stackStats.GetValue("security.enabled")
if err != nil {
return false, err
}

isSecurityEnabled, ok := value.(bool)
if !ok {
return false, fmt.Errorf("Security enabled flag is not a boolean")
}

if !isSecurityEnabled {
return false, nil
}

// TLS does not need to be enabled if TLS is already enabled on the transport protocol
value, err = stackStats.GetValue("security.ssl.transport.enabled")
if err != nil {
return false, err
}

isTLSAlreadyEnabled, ok := value.(bool)
if !ok {
return false, fmt.Errorf("Transport protocol SSL enabled flag is not a boolean")
}

return !isTLSAlreadyEnabled, nil
}

// computeNodesHash computes a simple hash value that can be used to determine if the nodes listing has changed since the last report.
func computeNodesHash(clusterState common.MapStr) (int32, error) {
Copy link
Contributor Author

@ycombinator ycombinator Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value, err := clusterState.GetValue("nodes")
if err != nil {
return 0, err
}

nodes, ok := value.(map[string]interface{})
if !ok {
return 0, fmt.Errorf("Nodes is not a map")
}

var nodeEphemeralIDs []string
for _, value := range nodes {
nodeData, ok := value.(map[string]interface{})
if !ok {
return 0, fmt.Errorf("Node data is not a map")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: In general errors which are returned and not printed should start lower case as they could be embeded in other errors. I wonder why hound did not complain. Same applies to most other errors in this PR.

}

value, ok := nodeData["ephemeral_id"]
if !ok {
return 0, fmt.Errorf("Node data does not contain ephemeral ID")
}

ephemeralID, ok := value.(string)
if !ok {
return 0, fmt.Errorf("Node ephemeral ID is not a string")
}

nodeEphemeralIDs = append(nodeEphemeralIDs, ephemeralID)
}

sort.Strings(nodeEphemeralIDs)

combinedNodeEphemeralIDs := strings.Join(nodeEphemeralIDs, "")
return hash(combinedNodeEphemeralIDs), nil
}

func hash(s string) int32 {
h := fnv.New32()
h.Write([]byte(s))
return int32(h.Sum32()) // This cast is needed because the ES mapping is for a 32-bit *signed* integer
}

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return err
}

clusterStats, err := clusterStatsSchema.Apply(data)
if err != nil {
return err
}

dataMS := common.MapStr(data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a type assertion even though this should never fail I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should I do this?

I tried changing this line to:

dataMS, ok := data.(common.MapStr)

but that gives me the compile-time error:

invalid type assertion: data.(common.MapStr) (non-interface type map[string]interface {} on left)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore my comment, above is good.


passthruFields := []string{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general it seems these are values which outside x-pack reporting should be reported by the specific metricsets and to get all names for example it would be an aggregation based on the cluster id?

"indices.segments.file_sizes", // object with dynamic keys
"nodes.versions", // array of strings
"nodes.os.names", // array of objects
"nodes.jvm.versions", // array of objects
"nodes.plugins", // array of objects
"nodes.network_types", // object with dynamic keys
}
for _, fieldPath := range passthruFields {
if err = passthruField(fieldPath, dataMS, clusterStats); err != nil {
return err
}
}

clusterName, ok := data["cluster_name"].(string)
if !ok {
return elastic.MakeErrorForMissingField("cluster_name", elastic.Elasticsearch)
}

info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

license, err := elasticsearch.GetLicense(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

if err = passthruField("status", dataMS, clusterState); err != nil {
return err
}

nodesHash, err := computeNodesHash(clusterState)
if err != nil {
return err
}
clusterState.Put("nodes_hash", nodesHash)

stackStats, err := elasticsearch.GetStackStats(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

// TODO: Inject `apm.found` field under stackStats object

clusterNeedsTLS, err := clusterNeedsTLSEnabled(license, stackStats)
if err != nil {
return err
}
license.Put("cluster_needs_tls", clusterNeedsTLS) // This powers a cluster alert for enabling TLS on the ES transport protocol

event := mb.Event{}
event.RootFields = common.MapStr{
"cluster_uuid": info.ClusterID,
"cluster_name": clusterName,
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "cluster_stats",
"license": license,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the full license response reported here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is the response from the GET _xpack/license Elasticsearch API, which looks like this:

{
  "license": {
    "status": "active",
    "uid": "f80c1fb5-75e9-4536-be2d-a768b02abb46",
    "type": "basic",
    "issue_date": "2018-08-06T23:47:09.619Z",
    "issue_date_in_millis": 1533599229619,
    "max_nodes": 1000,
    "issued_to": "elasticsearch",
    "issuer": "elasticsearch",
    "start_date_in_millis": -1
  }
}

"version": info.Version.Number,
"cluster_stats": clusterStats,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin As you can see here, the document that is indexed into monitoring contains not only clusterStats but also clusterState (an abridged version of it) and stackStats.

For clusterState and stackStats we are simply calling the corresponding Elasticsearch APIs, doing a tiny bit of massaging of the response data, and then passing the resulting structure through over here. We are definitely not doing a complete schema conversion like we are doing for clusterStats.

It makes me wonder: should we treat clusterStats special over here? Or conversely, should we do complete schema conversions for clusterState and stackStats too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the internal reporting in ES happening? Is it just taking all the existing fields? If yes then we should do the same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing through cluster_stats as-is in 3b11529.

"cluster_state": clusterState,
"stack_stats": stackStats,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
r.Event(event)

return nil
}
Loading