diff --git a/metricbeat/helper/elastic/elastic.go b/metricbeat/helper/elastic/elastic.go index a075a858f9c8..9562f8baf7fc 100644 --- a/metricbeat/helper/elastic/elastic.go +++ b/metricbeat/helper/elastic/elastic.go @@ -82,7 +82,13 @@ func MakeXPackMonitoringIndexName(product Product) string { // ReportErrorForMissingField reports and returns an error message for the given // field being missing in API response received from a given product func ReportErrorForMissingField(field string, product Product, r mb.ReporterV2) error { - err := fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String())) + err := MakeErrorForMissingField(field, product) r.Error(err) return err } + +// MakeErrorForMissingField returns an error message for the given field being missing in an API +// response received from a given product +func MakeErrorForMissingField(field string, product Product) error { + return fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String())) +} diff --git a/metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go b/metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go index c6d848e7c1a3..3e573701f384 100644 --- a/metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go +++ b/metricbeat/module/elasticsearch/cluster_stats/cluster_stats.go @@ -63,7 +63,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { // Not master, no event sent if !isMaster { - logp.Debug("elasticsearch", "Trying to fetch index recovery stats from a non master node.") + logp.Debug("elasticsearch", "Trying to fetch cluster stats from a non master node.") return } @@ -73,5 +73,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { return } - eventMapping(r, content) + if m.MetricSet.XPack { + eventMappingXPack(r, m, content) + } else { + eventMapping(r, content) + } } diff --git a/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go b/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go new file mode 100644 index 000000000000..0ac7223e1588 --- /dev/null +++ b/metricbeat/module/elasticsearch/cluster_stats/data_xpack.go @@ -0,0 +1,239 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cluster_stats + +import ( + "encoding/json" + "fmt" + "hash/fnv" + "sort" + "strings" + "time" + + "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/module/elasticsearch" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/mb" +) + +func passthruField(fieldPath string, sourceData, targetData common.MapStr) error { + fieldValue, err := sourceData.GetValue(fieldPath) + if err != nil { + return elastic.MakeErrorForMissingField(fieldPath, elastic.Elasticsearch) + } + + targetData.Put(fieldPath, fieldValue) + return nil +} + +func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) { + // TLS does not need to be enabled if license type is something other than trial + value, err := license.GetValue("license.type") + if err != nil { + return false, elastic.MakeErrorForMissingField("license.type", elastic.Elasticsearch) + } + + licenseType, ok := value.(string) + if !ok { + return false, fmt.Errorf("license type is not a string") + } + + if licenseType != "trial" { + return false, nil + } + + // TLS does not need to be enabled if security is not enabled + value, err = stackStats.GetValue("security.enabled") + if err != nil { + return false, elastic.MakeErrorForMissingField("security.enabled", elastic.Elasticsearch) + } + + isSecurityEnabled, ok := value.(bool) + if !ok { + return false, fmt.Errorf("security enabled flag is not a boolean") + } + + if !isSecurityEnabled { + return false, nil + } + + // TLS does not need to be enabled if TLS is already enabled on the transport protocol + value, err = stackStats.GetValue("security.ssl.transport.enabled") + if err != nil { + return false, elastic.MakeErrorForMissingField("security.ssl.transport.enabled", elastic.Elasticsearch) + } + + isTLSAlreadyEnabled, ok := value.(bool) + if !ok { + return false, fmt.Errorf("transport protocol SSL enabled flag is not a boolean") + } + + return !isTLSAlreadyEnabled, nil +} + +// computeNodesHash computes a simple hash value that can be used to determine if the nodes listing has changed since the last report. +func computeNodesHash(clusterState common.MapStr) (int32, error) { + value, err := clusterState.GetValue("nodes") + if err != nil { + return 0, elastic.MakeErrorForMissingField("nodes", elastic.Elasticsearch) + } + + nodes, ok := value.(map[string]interface{}) + if !ok { + return 0, fmt.Errorf("nodes is not a map") + } + + var nodeEphemeralIDs []string + for _, value := range nodes { + nodeData, ok := value.(map[string]interface{}) + if !ok { + return 0, fmt.Errorf("node data is not a map") + } + + value, ok := nodeData["ephemeral_id"] + if !ok { + return 0, fmt.Errorf("node data does not contain ephemeral ID") + } + + ephemeralID, ok := value.(string) + if !ok { + return 0, fmt.Errorf("node ephemeral ID is not a string") + } + + nodeEphemeralIDs = append(nodeEphemeralIDs, ephemeralID) + } + + sort.Strings(nodeEphemeralIDs) + + combinedNodeEphemeralIDs := strings.Join(nodeEphemeralIDs, "") + return hash(combinedNodeEphemeralIDs), nil +} + +func hash(s string) int32 { + h := fnv.New32() + h.Write([]byte(s)) + return int32(h.Sum32()) // This cast is needed because the ES mapping is for a 32-bit *signed* integer +} + +func apmIndicesExist(clusterState common.MapStr) (bool, error) { + value, err := clusterState.GetValue("routing_table.indices") + if err != nil { + return false, elastic.MakeErrorForMissingField("routing_table.indices", elastic.Elasticsearch) + } + + indices, ok := value.(map[string]interface{}) + if !ok { + return false, fmt.Errorf("routing table indices is not a map") + } + + for name := range indices { + if strings.HasPrefix(name, "apm-") { + return true, nil + } + } + + return false, nil +} + +func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error { + var data map[string]interface{} + err := json.Unmarshal(content, &data) + if err != nil { + return err + } + + clusterStats := common.MapStr(data) + + value, err := clusterStats.GetValue("cluster_name") + if err != nil { + return elastic.MakeErrorForMissingField("cluster_name", elastic.Elasticsearch) + } + clusterName, ok := value.(string) + if !ok { + return fmt.Errorf("cluster name is not a string") + } + + info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI()) + if err != nil { + return err + } + + license, err := elasticsearch.GetLicense(m.HTTP, m.HTTP.GetURI()) + if err != nil { + return err + } + + clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI()) + if err != nil { + return err + } + + if err = passthruField("status", clusterStats, clusterState); err != nil { + return err + } + + nodesHash, err := computeNodesHash(clusterState) + if err != nil { + return err + } + clusterState.Put("nodes_hash", nodesHash) + + usage, err := elasticsearch.GetStackUsage(m.HTTP, m.HTTP.GetURI()) + if err != nil { + return err + } + + clusterNeedsTLS, err := clusterNeedsTLSEnabled(license, usage) + if err != nil { + return err + } + license.Put("cluster_needs_tls", clusterNeedsTLS) // This powers a cluster alert for enabling TLS on the ES transport protocol + + isAPMFound, err := apmIndicesExist(clusterState) + if err != nil { + return err + } + delete(clusterState, "routing_table") // We don't want to index the routing table in monitoring indices + + stackStats := map[string]interface{}{ + "xpack": usage, + "apm": map[string]interface{}{ + "found": isAPMFound, + }, + } + + event := mb.Event{} + event.RootFields = common.MapStr{ + "cluster_uuid": info.ClusterID, + "cluster_name": clusterName, + "timestamp": common.Time(time.Now()), + "interval_ms": m.Module().Config().Period / time.Millisecond, + "type": "cluster_stats", + "license": license, + "version": info.Version.Number, + "cluster_stats": clusterStats, + "cluster_state": clusterState, + "stack_stats": stackStats, + } + + event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch) + r.Event(event) + + return nil +} diff --git a/metricbeat/module/elasticsearch/elasticsearch.go b/metricbeat/module/elasticsearch/elasticsearch.go index 43be677a5c0c..72d62c1d0ddd 100644 --- a/metricbeat/module/elasticsearch/elasticsearch.go +++ b/metricbeat/module/elasticsearch/elasticsearch.go @@ -21,7 +21,10 @@ import ( "encoding/json" "fmt" "net/url" + "sync" + "time" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/helper" ) @@ -32,6 +35,9 @@ var clusterIDCache = map[string]string{} type Info struct { ClusterName string `json:"cluster_name"` ClusterID string `json:"cluster_uuid"` + Version struct { + Number string `json:"number"` + } `json:"version"` } // NodeInfo struct cotains data about the node @@ -167,3 +173,84 @@ func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error } return nil, fmt.Errorf("no node matched id %s", nodeID) } + +// GetLicense returns license information. Since we don't expect license information +// to change frequently, the information is cached for 1 minute to avoid +// hitting Elasticsearch frequently +func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) { + // First, check the cache + license := licenseCache.get() + + // Not cached, fetch license from Elasticsearch + if license == nil { + content, err := fetchPath(http, resetURI, "_xpack/license") + if err != nil { + return nil, err + } + + err = json.Unmarshal(content, &license) + if err != nil { + return nil, err + } + + // Cache license for a minute + licenseCache.set(license, time.Minute) + } + + return licenseCache.get(), nil +} + +// GetClusterState returns cluster state information +func GetClusterState(http *helper.HTTP, resetURI string) (common.MapStr, error) { + content, err := fetchPath(http, resetURI, "_cluster/state/version,master_node,nodes,routing_table") + if err != nil { + return nil, err + } + + var clusterState map[string]interface{} + err = json.Unmarshal(content, &clusterState) + return clusterState, err +} + +// GetStackUsage returns stack usage information +func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) { + content, err := fetchPath(http, resetURI, "_xpack/usage") + if err != nil { + return nil, err + } + + var stackUsage map[string]interface{} + err = json.Unmarshal(content, &stackUsage) + return stackUsage, err +} + +// Global cache for license information. Assumption is that license information changes infrequently +var licenseCache = &_licenseCache{} + +type _licenseCache struct { + sync.RWMutex + license common.MapStr + cachedOn time.Time + ttl time.Duration +} + +func (c *_licenseCache) get() common.MapStr { + c.Lock() + defer c.Unlock() + + if time.Since(c.cachedOn) > c.ttl { + // We are past the TTL, so invalidate cache + c.license = nil + } + + return c.license +} + +func (c *_licenseCache) set(license common.MapStr, ttl time.Duration) { + c.Lock() + defer c.Unlock() + + c.license = license + c.ttl = ttl + c.cachedOn = time.Now() +}