diff --git a/metricbeat/module/elasticsearch/ccr/ccr.go b/metricbeat/module/elasticsearch/ccr/ccr.go index 5a551029054c..bf43825221a1 100644 --- a/metricbeat/module/elasticsearch/ccr/ccr.go +++ b/metricbeat/module/elasticsearch/ccr/ccr.go @@ -66,7 +66,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { return } - info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI) + info, err := elasticsearch.GetInfo(m.HTTP, m.HostData().SanitizedURI+ccrStatsPath) if err != nil { r.Error(err) return @@ -92,9 +92,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { return } - err = eventsMapping(r, *info, content) - if err != nil { - r.Error(err) - return + if m.XPack { + eventsMappingXPack(r, m, *info, content) + } else { + err = eventsMapping(r, *info, content) + if err != nil { + r.Error(err) + return + } } } diff --git a/metricbeat/module/elasticsearch/ccr/data_xpack.go b/metricbeat/module/elasticsearch/ccr/data_xpack.go new file mode 100644 index 000000000000..80dbb92bb76a --- /dev/null +++ b/metricbeat/module/elasticsearch/ccr/data_xpack.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package ccr + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/joeshaw/multierror" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/metricbeat/helper/elastic" + "github.com/elastic/beats/metricbeat/mb" + "github.com/elastic/beats/metricbeat/module/elasticsearch" +) + +func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, info elasticsearch.Info, content []byte) error { + var data map[string]interface{} + err := json.Unmarshal(content, &data) + if err != nil { + r.Error(err) + return err + } + + var errors multierror.Errors + for _, followerShards := range data { + + shards, ok := followerShards.([]interface{}) + if !ok { + err := fmt.Errorf("shards is not an array") + errors = append(errors, err) + continue + } + + for _, s := range shards { + shard, ok := s.(map[string]interface{}) + if !ok { + err := fmt.Errorf("shard is not an object") + errors = append(errors, err) + continue + } + event := mb.Event{} + event.RootFields = common.MapStr{ + "cluster_uuid": info.ClusterID, + "timestamp": common.Time(time.Now()), + "interval_ms": m.Module().Config().Period / time.Millisecond, + "type": "ccr_stats", + "ccr_stats": shard, + } + + event.Index = elastic.MakeXPackMonitoringIndexName(elastic.Elasticsearch) + r.Event(event) + } + } + return errors.Err() +}