Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 142 additions & 17 deletions metricbeat/module/elasticsearch/elasticsearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/tests/compose"
Expand Down Expand Up @@ -62,33 +63,32 @@ var metricSets = []string{
"shard",
}

var xpackMetricSets = []string{
"ccr",
"enrich",
"cluster_stats",
"index",
"index_recovery",
"index_summary",
"ml_job",
"node_stats",
"shard",
}

func TestFetch(t *testing.T) {
service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch")

host := service.Host()
err := createIndex(host)
assert.NoError(t, err)

version, err := getElasticsearchVersion(host)
if err != nil {
t.Fatal("getting elasticsearch version", err)
}

err = enableTrialLicense(host, version)
assert.NoError(t, err)

err = createMLJob(host, version)
assert.NoError(t, err)

err = createCCRStats(host)
assert.NoError(t, err)

err = createEnrichStats(host)
assert.NoError(t, err)
setupTest(t, host, version)

for _, metricSet := range metricSets {
checkSkip(t, metricSet, version)
t.Run(metricSet, func(t *testing.T) {
checkSkip(t, metricSet, version)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet, host))
events, errs := mbtest.ReportingFetchV2Error(f)

Expand All @@ -104,7 +104,6 @@ func TestFetch(t *testing.T) {

func TestData(t *testing.T) {
service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch")

host := service.Host()

version, err := getElasticsearchVersion(host)
Expand All @@ -113,8 +112,8 @@ func TestData(t *testing.T) {
}

for _, metricSet := range metricSets {
checkSkip(t, metricSet, version)
t.Run(metricSet, func(t *testing.T) {
checkSkip(t, metricSet, version)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet, host))
err := mbtest.WriteEventsReporterV2Error(f, t, metricSet)
if err != nil {
Expand All @@ -124,6 +123,76 @@ func TestData(t *testing.T) {
}
}

func TestXPackEnabled(t *testing.T) {
service := compose.EnsureUpWithTimeout(t, 300, "elasticsearch")
host := service.Host()

version, err := getElasticsearchVersion(host)
require.NoError(t, err)

setupTest(t, host, version)

metricSetToTypesMap := map[string][]string{
"ccr": []string{"ccr_stats", "ccr_auto_follow_stats"},
"cluster_stats": []string{"cluster_stats"},
"enrich": []string{"enrich_coordinator_stats"},
"index_recovery": []string{"index_recovery"},
"index_summary": []string{"indices_stats"},
"ml_job": []string{"job_stats"},
"node_stats": []string{"node_stats"},
}

config := getXPackConfig(host)

metricSets := mbtest.NewReportingMetricSetV2Errors(t, config)
for _, metricSet := range metricSets {
t.Run(metricSet.Name(), func(t *testing.T) {
checkSkip(t, metricSet.Name(), version)
events, errs := mbtest.ReportingFetchV2Error(metricSet)
require.Empty(t, errs)
require.NotEmpty(t, events)

// Special case: the `index` metricset generates as many events
// as there are distinct indices in Elasticsearch
if metricSet.Name() == "index" {
numIndices, err := countIndices(host)
require.NoError(t, err)
require.Len(t, events, numIndices)

for _, event := range events {
require.Equal(t, "index_stats", event.RootFields["type"])
require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index)
}

return
}

// Special case: the `shard` metricset generates as many events
// as there are distinct shards in Elasticsearch
if metricSet.Name() == "shard" {
numShards, err := countShards(host)
require.NoError(t, err)
require.Len(t, events, numShards)

for _, event := range events {
require.Equal(t, "shards", event.RootFields["type"])
require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index)
}

return
}

types := metricSetToTypesMap[metricSet.Name()]
require.Len(t, events, len(types))

for i, event := range events {
require.Equal(t, types[i], event.RootFields["type"])
require.Regexp(t, `^.monitoring-es-\d-mb`, event.Index)
}
})
}
}

// GetConfig returns config for elasticsearch module
func getConfig(metricset string, host string) map[string]interface{} {
return map[string]interface{}{
Expand All @@ -134,6 +203,32 @@ func getConfig(metricset string, host string) map[string]interface{} {
}
}

func getXPackConfig(host string) map[string]interface{} {
return map[string]interface{}{
"module": elasticsearch.ModuleName,
"metricsets": xpackMetricSets,
"hosts": []string{host},
"xpack.enabled": true,
}
}

func setupTest(t *testing.T, esHost string, esVersion *common.Version) {
err := createIndex(esHost)
require.NoError(t, err)

err = enableTrialLicense(esHost, esVersion)
require.NoError(t, err)

err = createMLJob(esHost, esVersion)
require.NoError(t, err)

err = createCCRStats(esHost)
require.NoError(t, err)

err = createEnrichStats(esHost)
require.NoError(t, err)
}

// createIndex creates and elasticsearch index in case it does not exit yet
func createIndex(host string) error {
client := &http.Client{}
Expand Down Expand Up @@ -451,6 +546,36 @@ func ingestAndEnrichDoc(host string) error {
return err
}

func countIndices(elasticsearchHostPort string) (int, error) {
return countCatItems(elasticsearchHostPort, "indices")

}

func countShards(elasticsearchHostPort string) (int, error) {
return countCatItems(elasticsearchHostPort, "shards")
}

func countCatItems(elasticsearchHostPort, catObject string) (int, error) {
resp, err := http.Get("http://" + elasticsearchHostPort + "/_cat/" + catObject + "?format=json")
if err != nil {
return 0, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}

var data []common.MapStr
err = json.Unmarshal(body, &data)
if err != nil {
return 0, err
}

return len(data), nil
}

func checkSkip(t *testing.T, metricset string, version *common.Version) {
checkSkipFeature := func(name string, availableVersion *common.Version) {
isAPIAvailable := elastic.IsFeatureAvailable(version, availableVersion)
Expand Down