diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index f670a7bd615b..3ba0699589b7 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -31,6 +31,7 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/tests/compose" @@ -62,33 +63,32 @@ var metricSets = []string{ "shard", } +var xpackMetricSets = []string{ + "ccr", + "enrich", + "cluster_stats", + "index", + "index_recovery", + "index_summary", + "ml_job", + "node_stats", + "shard", +} + func TestFetch(t *testing.T) { service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch") - host := service.Host() - err := createIndex(host) - assert.NoError(t, err) version, err := getElasticsearchVersion(host) if err != nil { t.Fatal("getting elasticsearch version", err) } - err = enableTrialLicense(host, version) - assert.NoError(t, err) - - err = createMLJob(host, version) - assert.NoError(t, err) - - err = createCCRStats(host) - assert.NoError(t, err) - - err = createEnrichStats(host) - assert.NoError(t, err) + setupTest(t, host, version) for _, metricSet := range metricSets { - checkSkip(t, metricSet, version) t.Run(metricSet, func(t *testing.T) { + checkSkip(t, metricSet, version) f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet, host)) events, errs := mbtest.ReportingFetchV2Error(f) @@ -104,7 +104,6 @@ func TestFetch(t *testing.T) { func TestData(t *testing.T) { service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch") - host := service.Host() version, err := getElasticsearchVersion(host) @@ -113,8 +112,8 @@ func TestData(t *testing.T) { } for _, metricSet := range metricSets { - checkSkip(t, metricSet, version) t.Run(metricSet, func(t *testing.T) { + checkSkip(t, metricSet, version) f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet, host)) err := mbtest.WriteEventsReporterV2Error(f, t, metricSet) if err != nil { @@ -124,6 +123,76 @@ func TestData(t *testing.T) { } } +func TestXPackEnabled(t *testing.T) { + service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch") + host := service.Host() + + version, err := getElasticsearchVersion(host) + require.NoError(t, err) + + setupTest(t, host, version) + + metricSetToTypesMap := map[string][]string{ + "ccr": []string{"ccr_stats", "ccr_auto_follow_stats"}, + "cluster_stats": []string{"cluster_stats"}, + "enrich": []string{"enrich_coordinator_stats"}, + "index_recovery": []string{"index_recovery"}, + "index_summary": []string{"indices_stats"}, + "ml_job": []string{"job_stats"}, + "node_stats": []string{"node_stats"}, + } + + config := getXPackConfig(host) + + metricSets := mbtest.NewReportingMetricSetV2Errors(t, config) + for _, metricSet := range metricSets { + t.Run(metricSet.Name(), func(t *testing.T) { + checkSkip(t, metricSet.Name(), version) + events, errs := mbtest.ReportingFetchV2Error(metricSet) + require.Empty(t, errs) + require.NotEmpty(t, events) + + // Special case: the `index` metricset generates as many events + // as there are distinct indices in Elasticsearch + if metricSet.Name() == "index" { + numIndices, err := countIndices(host) + require.NoError(t, err) + require.Len(t, events, numIndices) + + for _, event := range events { + require.Equal(t, "index_stats", event.RootFields["type"]) + require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index) + } + + return + } + + // Special case: the `shard` metricset generates as many events + // as there are distinct shards in Elasticsearch + if metricSet.Name() == "shard" { + numShards, err := countShards(host) + require.NoError(t, err) + require.Len(t, events, numShards) + + for _, event := range events { + require.Equal(t, "shards", event.RootFields["type"]) + require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index) + } + + return + } + + types := metricSetToTypesMap[metricSet.Name()] + require.Len(t, events, len(types)) + + for i, event := range events { + require.Equal(t, types[i], event.RootFields["type"]) + require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index) + } + }) + } +} + // GetConfig returns config for elasticsearch module func getConfig(metricset string, host string) map[string]interface{} { return map[string]interface{}{ @@ -134,6 +203,32 @@ func getConfig(metricset string, host string) map[string]interface{} { } } +func getXPackConfig(host string) map[string]interface{} { + return map[string]interface{}{ + "module": elasticsearch.ModuleName, + "metricsets": xpackMetricSets, + "hosts": []string{host}, + "xpack.enabled": true, + } +} + +func setupTest(t *testing.T, esHost string, esVersion *common.Version) { + err := createIndex(esHost) + require.NoError(t, err) + + err = enableTrialLicense(esHost, esVersion) + require.NoError(t, err) + + err = createMLJob(esHost, esVersion) + require.NoError(t, err) + + err = createCCRStats(esHost) + require.NoError(t, err) + + err = createEnrichStats(esHost) + require.NoError(t, err) +} + // createIndex creates and elasticsearch index in case it does not exit yet func createIndex(host string) error { client := &http.Client{} @@ -451,6 +546,36 @@ func ingestAndEnrichDoc(host string) error { return err } +func countIndices(elasticsearchHostPort string) (int, error) { + return countCatItems(elasticsearchHostPort, "indices") + +} + +func countShards(elasticsearchHostPort string) (int, error) { + return countCatItems(elasticsearchHostPort, "shards") +} + +func countCatItems(elasticsearchHostPort, catObject string) (int, error) { + resp, err := http.Get("http://" + elasticsearchHostPort + "/_cat/" + catObject + "?format=json") + if err != nil { + return 0, err + } + defer resp.Body.Close() + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, err + } + + var data []common.MapStr + err = json.Unmarshal(body, &data) + if err != nil { + return 0, err + } + + return len(data), nil +} + func checkSkip(t *testing.T, metricset string, version *common.Version) { checkSkipFeature := func(name string, availableVersion *common.Version) { isAPIAvailable := elastic.IsFeatureAvailable(version, availableVersion)