From d1f4f3f024c5be75cfb41d44fd56dca2c7587638 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 24 Sep 2018 10:43:34 -0700 Subject: [PATCH] Adding x-pack monitoring code for elasticsearch/ccr metricset (#8336) This PR teaches the `elasticsearch/ccr` metricset to index documents into `.monitoring-es-6-*` indices when `xpack.enabled: true` is set in `modules.d/elasticsearch.yml`. ### To test this PR The idea is that metricbeat (specifically the elasticsearch/ccr metricset with `xpack.enabled: true`) will create exactly the same documents in `.monitoring-es-*` indices as Elasticsearch's internal collection and reporting does today. 1. Start up Elasticsearch (using the latest build from `master`). 2. Start up Kibana. 3. Start a trial license from the Kibana Management UI. 3. Enable Monitoring in Elasticsearch (via the cluster setting `xpack.monitoring.collection.enabled: true`). You can do this via [Elasticsearch's Cluster Update Settings API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html) or by clicking the "Turn on Monitoring" button in the Monitoring UI in Kibana. 4. Set up CCR with a leader and follower index. 4. Let Elasticsearch run for ~20 seconds so a few documents are indexed into `.monitoring-es-6-*`. 6. From `.monitoring-es-6-*`, retrieve a document for `type = ccr_stats` 8. Turn off Elasticsearch's internal collection via the cluster setting `xpack.monitoring.elasticsearch.collection.enabled: false`. 7. Delete `.monitoring-es-6-*` indices. 10. Enable the `elasticsearch` module in metricbeat: `./metricbeat modules enable elasticsearch`. 11. In `modules.d/elasticsearch.yml`, add the `ccr` metricset and set `xpack.enabled: true`. Concretely, your `modules.d/elasticsearch.yml` should look something like this: ```yaml - module: elasticsearch metricsets: - ccr period: 10s hosts: ["http://localhost:9200"] #username: "user" #password: "secret" xpack.enabled: true ``` 12. Start metricbeat. 13. Let metricbeat run for ~20 seconds so a few documents are indexed into `.monitoring-es-*`. 14. Stop metricbeat 6. From `.monitoring-es-6-*`, retrieve a document for `type = ccr_stats` 16. Using a tool such as http://www.jsondiff.com, compare the documents indexed by Elasticsearch with those indexed by metricbeat. Verify that their structures are identical (same fields, not necessarily same values), except for these known and expected differences: 1. Only Metricbeat-indexed documents are expected to contain the fields `@timestamp`, `beat`, `host`, and `metricset`. These are "standard" fields added by beats and metricbeat and don't have an adverse impact since they are additive. 3. Only Elasticsearch-indexed documents are expected to contain the field `source_node`. This field is used for debugging purposes only and not actually consumed by either the Monitoring UI or Telemetry feature in Kibana. (cherry picked from commit 35068ee4fa34ad5bc9134babd5020d678a57329f) --- metricbeat/module/elasticsearch/ccr/ccr.go | 14 ++-- .../module/elasticsearch/ccr/data_xpack.go | 72 +++++++++++++++++++ 2 files changed, 81 insertions(+), 5 deletions(-) create mode 100644 metricbeat/module/elasticsearch/ccr/data_xpack.go diff --git a/metricbeat/module/elasticsearch/ccr/ccr.go b/metricbeat/module/elasticsearch/ccr/ccr.go index 5a551029054c..bf43825221a1 100644 --- a/metricbeat/module/elasticsearch/ccr/ccr.go +++ b/metricbeat/module/elasticsearch/ccr/ccr.go @@ -66,7 +66,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { return } - info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI) + info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath) if err != nil { r.Error(err) return @@ -92,9 +92,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { return } - err = eventsMapping(r, *info, content) - if err != nil { - r.Error(err) - return + if m.XPack { + eventsMappingXPack(r, m, *info, content) + } else { + err = eventsMapping(r, *info, content) + if err != nil { + r.Error(err) + return + } } } diff --git a/metricbeat/module/elasticsearch/ccr/data_xpack.go b/metricbeat/module/elasticsearch/ccr/data_xpack.go new file mode 100644 index 000000000000..80dbb92bb76a --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/data_xpack.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ccr + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/joeshaw/multierror" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { + var data map[string]interface{} + err := json.Unmarshal(content, &data) + if err != nil { + r.Error(err) + return err + } + + var errors multierror.Errors + for _, followerShards := range data { + + shards, ok := followerShards.([]interface{}) + if !ok { + err := fmt.Errorf("shards is not an array") + errors = append(errors, err) + continue + } + + for _, s := range shards { + shard, ok := s.(map[string]interface{}) + if !ok { + err := fmt.Errorf("shard is not an object") + errors = append(errors, err) + continue + } + event := mb.Event{} + event.RootFields = common.MapStr{ + "cluster_uuid": info.ClusterID, + "timestamp": common.Time(time.Now()), + "interval_ms": m.Module().Config().Period / time.Millisecond, + "type": "ccr_stats", + "ccr_stats": shard, + } + + event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch) + r.Event(event) + } + } + return errors.Err() +}