Skip to content

Commit 35068ee

Browse files
authored
Adding x-pack monitoring code for elasticsearch/ccr metricset (#8336)
This PR teaches the `elasticsearch/ccr` metricset to index documents into `.monitoring-es-6-*` indices when `xpack.enabled: true` is set in `modules.d/elasticsearch.yml`. ### To test this PR The idea is that metricbeat (specifically the elasticsearch/ccr metricset with `xpack.enabled: true`) will create exactly the same documents in `.monitoring-es-*` indices as Elasticsearch's internal collection and reporting does today. 1. Start up Elasticsearch (using the latest build from `master`). 2. Start up Kibana. 3. Start a trial license from the Kibana Management UI. 3. Enable Monitoring in Elasticsearch (via the cluster setting `xpack.monitoring.collection.enabled: true`). You can do this via [Elasticsearch's Cluster Update Settings API](https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-update-settings.html) or by clicking the "Turn on Monitoring" button in the Monitoring UI in Kibana. 4. Set up CCR with a leader and follower index. 4. Let Elasticsearch run for ~20 seconds so a few documents are indexed into `.monitoring-es-6-*`. 6. From `.monitoring-es-6-*`, retrieve a document for `type = ccr_stats` 8. Turn off Elasticsearch's internal collection via the cluster setting `xpack.monitoring.elasticsearch.collection.enabled: false`. 7. Delete `.monitoring-es-6-*` indices. 10. Enable the `elasticsearch` module in metricbeat: `./metricbeat modules enable elasticsearch`. 11. In `modules.d/elasticsearch.yml`, add the `ccr` metricset and set `xpack.enabled: true`. Concretely, your `modules.d/elasticsearch.yml` should look something like this: ```yaml - module: elasticsearch metricsets: - ccr period: 10s hosts: ["http://localhost:9200"] #username: "user" #password: "secret" xpack.enabled: true ``` 12. Start metricbeat. 13. Let metricbeat run for ~20 seconds so a few documents are indexed into `.monitoring-es-*`. 14. Stop metricbeat 6. From `.monitoring-es-6-*`, retrieve a document for `type = ccr_stats` 16. Using a tool such as http://www.jsondiff.com, compare the documents indexed by Elasticsearch with those indexed by metricbeat. Verify that their structures are identical (same fields, not necessarily same values), except for these known and expected differences: 1. Only Metricbeat-indexed documents are expected to contain the fields `@timestamp`, `beat`, `host`, and `metricset`. These are "standard" fields added by beats and metricbeat and don't have an adverse impact since they are additive. 3. Only Elasticsearch-indexed documents are expected to contain the field `source_node`. This field is used for debugging purposes only and not actually consumed by either the Monitoring UI or Telemetry feature in Kibana.
1 parent e1ca577 commit 35068ee

File tree

2 files changed

+81
-5
lines changed

2 files changed

+81
-5
lines changed

metricbeat/module/elasticsearch/ccr/ccr.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
6666
return
6767
}
6868

69-
info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI)
69+
info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath)
7070
if err != nil {
7171
r.Error(err)
7272
return
@@ -92,9 +92,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) {
9292
return
9393
}
9494

95-
err = eventsMapping(r, *info, content)
96-
if err != nil {
97-
r.Error(err)
98-
return
95+
if m.XPack {
96+
eventsMappingXPack(r, m, *info, content)
97+
} else {
98+
err = eventsMapping(r, *info, content)
99+
if err != nil {
100+
r.Error(err)
101+
return
102+
}
99103
}
100104
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package ccr
19+
20+
import (
21+
"encoding/json"
22+
"fmt"
23+
"time"
24+
25+
"github.com/joeshaw/multierror"
26+
27+
"github.com/elastic/beats/libbeat/common"
28+
"github.com/elastic/beats/metricbeat/helper/elastic"
29+
"github.com/elastic/beats/metricbeat/mb"
30+
"github.com/elastic/beats/metricbeat/module/elasticsearch"
31+
)
32+
33+
func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error {
34+
var data map[string]interface{}
35+
err := json.Unmarshal(content, &data)
36+
if err != nil {
37+
r.Error(err)
38+
return err
39+
}
40+
41+
var errors multierror.Errors
42+
for _, followerShards := range data {
43+
44+
shards, ok := followerShards.([]interface{})
45+
if !ok {
46+
err := fmt.Errorf("shards is not an array")
47+
errors = append(errors, err)
48+
continue
49+
}
50+
51+
for _, s := range shards {
52+
shard, ok := s.(map[string]interface{})
53+
if !ok {
54+
err := fmt.Errorf("shard is not an object")
55+
errors = append(errors, err)
56+
continue
57+
}
58+
event := mb.Event{}
59+
event.RootFields = common.MapStr{
60+
"cluster_uuid": info.ClusterID,
61+
"timestamp": common.Time(time.Now()),
62+
"interval_ms": m.Module().Config().Period / time.Millisecond,
63+
"type": "ccr_stats",
64+
"ccr_stats": shard,
65+
}
66+
67+
event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch)
68+
r.Event(event)
69+
}
70+
}
71+
return errors.Err()
72+
}

0 commit comments

Comments
 (0)