diff --git a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go index a41ed72cd0b2..5647311d037e 100644 --- a/metricbeat/module/elasticsearch/elasticsearch_integration_test.go +++ b/metricbeat/module/elasticsearch/elasticsearch_integration_test.go @@ -29,6 +29,7 @@ import ( "os" "testing" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" @@ -78,8 +79,8 @@ func TestFetch(t *testing.T) { for _, metricSet := range metricSets { checkSkip(t, metricSet, host) t.Run(metricSet, func(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) - events, errs := mbtest.ReportingFetchV2(f) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet)) + events, errs := mbtest.ReportingFetchV2Error(f) assert.Empty(t, errs) if !assert.NotEmpty(t, events) { @@ -98,8 +99,8 @@ func TestData(t *testing.T) { for _, metricSet := range metricSets { checkSkip(t, metricSet, host) t.Run(metricSet, func(t *testing.T) { - f := mbtest.NewReportingMetricSetV2(t, getConfig(metricSet)) - err := mbtest.WriteEventsReporterV2(f, t, metricSet) + f := mbtest.NewReportingMetricSetV2Error(t, getConfig(metricSet)) + err := mbtest.WriteEventsReporterV2Error(f, t, metricSet) if err != nil { t.Fatal("write", err) } @@ -226,12 +227,48 @@ func createCCRStats(host string) error { err = createCCRFollowerIndex(host) if err != nil { - return err + return errors.Wrap(err, "error creating CCR follower index") + } + + // Give ES sufficient time to do the replication and produce stats + checkCCRStats := func() (bool, error) { + return checkCCRStatsExists(host) + } + + exists, err := waitForSuccess(checkCCRStats, 300, 5) + if err != nil { + return errors.Wrap(err, "error checking if CCR stats exist") + } + + if !exists { + return fmt.Errorf("expected to find CCR stats but not found") } return nil } +func checkCCRStatsExists(host string) (bool, error) { + resp, err := http.Get("http://" + host + "/_ccr/stats") + if err != nil { + return false, err + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return false, err + } + var data struct { + FollowStats struct { + Indices []map[string]interface{} `json:"indices"` + } `json:"follow_stats"` + } + err = json.Unmarshal(body, &data) + if err != nil { + return false, err + } + return len(data.FollowStats.Indices) > 0, nil +} + func setupCCRRemote(host string) error { remoteSettings, err := ioutil.ReadFile("ccr/_meta/test/test_remote_settings.json") if err != nil {