Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion metricbeat/helper/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ func MakeXPackMonitoringIndexName(product Product) string {
// ReportErrorForMissingField reports and returns an error message for the given
// field being missing in API response received from a given product
func ReportErrorForMissingField(field string, product Product, r mb.ReporterV2) error {
err := fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String()))
err := MakeErrorForMissingField(field, product)
r.Error(err)
return err
}

// MakeErrorForMissingField returns an error message for the given field being missing in an API
// response received from a given product
func MakeErrorForMissingField(field string, product Product) error {
return fmt.Errorf("Could not find field '%v' in %v stats API response", field, strings.Title(product.String()))
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {

// Not master, no event sent
if !isMaster {
logp.Debug("elasticsearch", "Trying to fetch index recovery stats from a non master node.")
logp.Debug("elasticsearch", "Trying to fetch cluster stats from a non master node.")
return
}

Expand All @@ -73,5 +73,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
return
}

eventMapping(r, content)
if m.MetricSet.XPack {
eventMappingXPack(r, m, content)
} else {
eventMapping(r, content)
}
}
239 changes: 239 additions & 0 deletions metricbeat/module/elasticsearch/cluster_stats/data_xpack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package cluster_stats

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use an underscore in package name


import (
"encoding/json"
"fmt"
"hash/fnv"
"sort"
"strings"
"time"

"github.com/elastic/beats/metricbeat/helper/elastic"
"github.com/elastic/beats/metricbeat/module/elasticsearch"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
)

func passthruField(fieldPath string, sourceData, targetData common.MapStr) error {
fieldValue, err := sourceData.GetValue(fieldPath)
if err != nil {
return elastic.MakeErrorForMissingField(fieldPath, elastic.Elasticsearch)
}

targetData.Put(fieldPath, fieldValue)
return nil
}

func clusterNeedsTLSEnabled(license, stackStats common.MapStr) (bool, error) {
// TLS does not need to be enabled if license type is something other than trial
value, err := license.GetValue("license.type")
if err != nil {
return false, elastic.MakeErrorForMissingField("license.type", elastic.Elasticsearch)
}

licenseType, ok := value.(string)
if !ok {
return false, fmt.Errorf("license type is not a string")
}

if licenseType != "trial" {
return false, nil
}

// TLS does not need to be enabled if security is not enabled
value, err = stackStats.GetValue("security.enabled")
if err != nil {
return false, elastic.MakeErrorForMissingField("security.enabled", elastic.Elasticsearch)
}

isSecurityEnabled, ok := value.(bool)
if !ok {
return false, fmt.Errorf("security enabled flag is not a boolean")
}

if !isSecurityEnabled {
return false, nil
}

// TLS does not need to be enabled if TLS is already enabled on the transport protocol
value, err = stackStats.GetValue("security.ssl.transport.enabled")
if err != nil {
return false, elastic.MakeErrorForMissingField("security.ssl.transport.enabled", elastic.Elasticsearch)
}

isTLSAlreadyEnabled, ok := value.(bool)
if !ok {
return false, fmt.Errorf("transport protocol SSL enabled flag is not a boolean")
}

return !isTLSAlreadyEnabled, nil
}

// computeNodesHash computes a simple hash value that can be used to determine if the nodes listing has changed since the last report.
func computeNodesHash(clusterState common.MapStr) (int32, error) {
Copy link
Contributor Author

@ycombinator ycombinator Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value, err := clusterState.GetValue("nodes")
if err != nil {
return 0, elastic.MakeErrorForMissingField("nodes", elastic.Elasticsearch)
}

nodes, ok := value.(map[string]interface{})
if !ok {
return 0, fmt.Errorf("nodes is not a map")
}

var nodeEphemeralIDs []string
for _, value := range nodes {
nodeData, ok := value.(map[string]interface{})
if !ok {
return 0, fmt.Errorf("node data is not a map")
}

value, ok := nodeData["ephemeral_id"]
if !ok {
return 0, fmt.Errorf("node data does not contain ephemeral ID")
}

ephemeralID, ok := value.(string)
if !ok {
return 0, fmt.Errorf("node ephemeral ID is not a string")
}

nodeEphemeralIDs = append(nodeEphemeralIDs, ephemeralID)
}

sort.Strings(nodeEphemeralIDs)

combinedNodeEphemeralIDs := strings.Join(nodeEphemeralIDs, "")
return hash(combinedNodeEphemeralIDs), nil
}

func hash(s string) int32 {
h := fnv.New32()
h.Write([]byte(s))
return int32(h.Sum32()) // This cast is needed because the ES mapping is for a 32-bit *signed* integer
}

func apmIndicesExist(clusterState common.MapStr) (bool, error) {
value, err := clusterState.GetValue("routing_table.indices")
if err != nil {
return false, elastic.MakeErrorForMissingField("routing_table.indices", elastic.Elasticsearch)
}

indices, ok := value.(map[string]interface{})
if !ok {
return false, fmt.Errorf("routing table indices is not a map")
}

for name := range indices {
if strings.HasPrefix(name, "apm-") {
return true, nil
}
}

return false, nil
}

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
var data map[string]interface{}
err := json.Unmarshal(content, &data)
if err != nil {
return err
}

clusterStats := common.MapStr(data)

value, err := clusterStats.GetValue("cluster_name")
if err != nil {
return elastic.MakeErrorForMissingField("cluster_name", elastic.Elasticsearch)
}
clusterName, ok := value.(string)
if !ok {
return fmt.Errorf("cluster name is not a string")
}

info, err := elasticsearch.GetInfo(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

license, err := elasticsearch.GetLicense(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

clusterState, err := elasticsearch.GetClusterState(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

if err = passthruField("status", clusterStats, clusterState); err != nil {
return err
}

nodesHash, err := computeNodesHash(clusterState)
if err != nil {
return err
}
clusterState.Put("nodes_hash", nodesHash)

usage, err := elasticsearch.GetStackUsage(m.HTTP, m.HTTP.GetURI())
if err != nil {
return err
}

clusterNeedsTLS, err := clusterNeedsTLSEnabled(license, usage)
if err != nil {
return err
}
license.Put("cluster_needs_tls", clusterNeedsTLS) // This powers a cluster alert for enabling TLS on the ES transport protocol

isAPMFound, err := apmIndicesExist(clusterState)
if err != nil {
return err
}
delete(clusterState, "routing_table") // We don't want to index the routing table in monitoring indices

stackStats := map[string]interface{}{
"xpack": usage,
"apm": map[string]interface{}{
"found": isAPMFound,
},
}

event := mb.Event{}
event.RootFields = common.MapStr{
"cluster_uuid": info.ClusterID,
"cluster_name": clusterName,
"timestamp": common.Time(time.Now()),
"interval_ms": m.Module().Config().Period / time.Millisecond,
"type": "cluster_stats",
"license": license,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the full license response reported here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is the response from the GET _xpack/license Elasticsearch API, which looks like this:

{
  "license": {
    "status": "active",
    "uid": "f80c1fb5-75e9-4536-be2d-a768b02abb46",
    "type": "basic",
    "issue_date": "2018-08-06T23:47:09.619Z",
    "issue_date_in_millis": 1533599229619,
    "max_nodes": 1000,
    "issued_to": "elasticsearch",
    "issuer": "elasticsearch",
    "start_date_in_millis": -1
  }
}

"version": info.Version.Number,
"cluster_stats": clusterStats,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin As you can see here, the document that is indexed into monitoring contains not only clusterStats but also clusterState (an abridged version of it) and stackStats.

For clusterState and stackStats we are simply calling the corresponding Elasticsearch APIs, doing a tiny bit of massaging of the response data, and then passing the resulting structure through over here. We are definitely not doing a complete schema conversion like we are doing for clusterStats.

It makes me wonder: should we treat clusterStats special over here? Or conversely, should we do complete schema conversions for clusterState and stackStats too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the internal reporting in ES happening? Is it just taking all the existing fields? If yes then we should do the same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing through cluster_stats as-is in 3b11529.

"cluster_state": clusterState,
"stack_stats": stackStats,
}

event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
r.Event(event)

return nil
}
87 changes: 87 additions & 0 deletions metricbeat/module/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"encoding/json"
"fmt"
"net/url"
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
)

Expand All @@ -32,6 +35,9 @@ var clusterIDCache = map[string]string{}
type Info struct {
ClusterName string `json:"cluster_name"`
ClusterID string `json:"cluster_uuid"`
Version struct {
Number string `json:"number"`
} `json:"version"`
}

// NodeInfo struct cotains data about the node
Expand Down Expand Up @@ -167,3 +173,84 @@ func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error
}
return nil, fmt.Errorf("no node matched id %s", nodeID)
}

// GetLicense returns license information. Since we don't expect license information
// to change frequently, the information is cached for 1 minute to avoid
// hitting Elasticsearch frequently
func GetLicense(http *helper.HTTP, resetURI string) (common.MapStr, error) {
// First, check the cache
license := licenseCache.get()

// Not cached, fetch license from Elasticsearch
if license == nil {
content, err := fetchPath(http, resetURI, "_xpack/license")
if err != nil {
return nil, err
}

err = json.Unmarshal(content, &license)
if err != nil {
return nil, err
}

// Cache license for a minute
licenseCache.set(license, time.Minute)
}

return licenseCache.get(), nil
}

// GetClusterState returns cluster state information
func GetClusterState(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_cluster/state/version,master_node,nodes,routing_table")
if err != nil {
return nil, err
}

var clusterState map[string]interface{}
err = json.Unmarshal(content, &clusterState)
return clusterState, err
}

// GetStackUsage returns stack usage information
func GetStackUsage(http *helper.HTTP, resetURI string) (common.MapStr, error) {
content, err := fetchPath(http, resetURI, "_xpack/usage")
if err != nil {
return nil, err
}

var stackUsage map[string]interface{}
err = json.Unmarshal(content, &stackUsage)
return stackUsage, err
}

// Global cache for license information. Assumption is that license information changes infrequently
var licenseCache = &_licenseCache{}

type _licenseCache struct {
sync.RWMutex
license common.MapStr
cachedOn time.Time
ttl time.Duration
}

func (c *_licenseCache) get() common.MapStr {
c.Lock()
defer c.Unlock()

if time.Since(c.cachedOn) > c.ttl {
// We are past the TTL, so invalidate cache
c.license = nil
}

return c.license
}

func (c *_licenseCache) set(license common.MapStr, ttl time.Duration) {
c.Lock()
defer c.Unlock()

c.license = license
c.ttl = ttl
c.cachedOn = time.Now()
}