diff --git a/.ci/Jenkinsfile b/.ci/Jenkinsfile index 06f7f606e2..086cc919f9 100644 --- a/.ci/Jenkinsfile +++ b/.ci/Jenkinsfile @@ -26,8 +26,7 @@ pipeline { parameters { choice(name: 'runTestsSuite', choices: ['all', 'helm', 'ingest-manager', 'metricbeat'], description: 'Choose which test suite to run (default: all)') choice(name: 'LOG_LEVEL', choices: ['INFO', 'DEBUG'], description: 'Log level to be used') - choice(name: 'QUERY_MAX_ATTEMPTS', choices: ['5', '10', '20'], description: 'Number of attempts to create the connection to Elasticsearch') - choice(name: 'RETRY_TIMEOUT', choices: ['3', '5', '7', '11'], description: 'Number of seconds between retry') + choice(name: 'RETRY_TIMEOUT', choices: ['3', '5', '7', '11'], description: 'Max number of minutes for timeout backoff strategies') string(name: 'STACK_VERSION', defaultValue: '7.7.0', description: 'SemVer version of the stack to be used.') string(name: 'METRICBEAT_VERSION', defaultValue: '7.7.0', description: 'SemVer version of the metricbeat to be used.') string(name: 'HELM_CHART_VERSION', defaultValue: '7.6.1', description: 'SemVer version of Helm chart to be used.') @@ -54,7 +53,6 @@ pipeline { KIND_VERSION = "${params.KIND_VERSION.trim()}" KUBERNETES_VERSION = "${params.KUBERNETES_VERSION.trim()}" LOG_LEVEL = "${params.LOG_LEVEL.trim()}" - QUERY_MAX_ATTEMPTS = "${params.QUERY_MAX_ATTEMPTS.trim()}" RETRY_TIMEOUT = "${params.RETRY_TIMEOUT.trim()}" } stages { diff --git a/e2e/Makefile b/e2e/Makefile index 51b1ae942c..1cc6f1704d 100644 --- a/e2e/Makefile +++ b/e2e/Makefile @@ -3,7 +3,6 @@ FEATURE?= FORMAT?=pretty LOG_INCLUDE_TIMESTAMP?=TRUE LOG_LEVEL?=INFO -QUERY_MAX_ATTEMPTS?=5 RETRY_TIMEOUT?=3 STACK_VERSION?= METRICBEAT_VERSION?= @@ -37,7 +36,6 @@ functional-test: install-godog cd _suites/${SUITE} && \ OP_LOG_LEVEL=${LOG_LEVEL} \ OP_LOG_INCLUDE_TIMESTAMP=${LOG_INCLUDE_TIMESTAMP} \ - OP_QUERY_MAX_ATTEMPTS=${QUERY_MAX_ATTEMPTS} \ OP_RETRY_TIMEOUT=${RETRY_TIMEOUT} \ OP_METRICBEAT_VERSION=${METRICBEAT_VERSION} \ OP_STACK_VERSION=${STACK_VERSION} \ diff --git a/e2e/README.md b/e2e/README.md index a5b635e0f7..32572ca3c2 100644 --- a/e2e/README.md +++ b/e2e/README.md @@ -130,13 +130,11 @@ where: There are some environment variables you can use to improve the experience running the tests with `Make`. - **METRICBEAT_FETCH_TIMEOUT** (default: 20). This is the time in seconds we leave metricbeat grabbing metrics from the monitored integration module. -- **QUERY_MAX_ATTEMPTS** (default: 5). It's possible that the Elasticsearch is not ready for writes, so we can define a retry strategy to wait for our index to be ready. This variable defines the number of attempts the retry process will happen. -- **RETRY_TIMEOUT** (default: 3). For same reason as above, this variable defines the delay between attempts, before a successful connection to Elasticsearch is made. +- **RETRY_TIMEOUT** (default: 3 minutes). It's possible that the Elasticsearch is not ready for writes, so we can define a retry strategy to wait for our index to be ready. This variable defines the number of minutes the retry process will use as max timeout, where the implementation code will try using a backoff strategy until a condition is met. >Interested in running the tests directly using Godog? Please check out [the Makefile](./Makefile#L19). ```shell -export OP_QUERY_MAX_ATTEMPTS=${OP_QUERY_MAX_ATTEMPTS:-5} export OP_RETRY_TIMEOUT=${OP_RETRY_TIMEOUT:-3} export FORMAT=${FORMAT:-pretty} # valid formats are: pretty, junit # If you do not pass a '-t moduleName' argument, then all tests will be run diff --git a/e2e/_suites/ingest-manager/features/ingest_manager.feature b/e2e/_suites/ingest-manager/features/ingest_manager.feature index a09d7b6a47..4217f80cfa 100644 --- a/e2e/_suites/ingest-manager/features/ingest_manager.feature +++ b/e2e/_suites/ingest-manager/features/ingest_manager.feature @@ -18,5 +18,5 @@ Scenario: Deploying a stand-alone agent @stop-agent Scenario: Stopping the agent container stops data going into ES Given a stand-alone agent is deployed - When the "agent" docker container is stopped + When the "elastic-agent" docker container is stopped Then there is no new data in the index after agent shuts down diff --git a/e2e/_suites/ingest-manager/ingest-manager_test.go b/e2e/_suites/ingest-manager/ingest-manager_test.go index 9f892906e6..c18a3ac1c3 100644 --- a/e2e/_suites/ingest-manager/ingest-manager_test.go +++ b/e2e/_suites/ingest-manager/ingest-manager_test.go @@ -21,12 +21,17 @@ var stackVersion = "8.0.0-SNAPSHOT" // affecting the runtime dependencies (or profile) var profileEnv map[string]string +// queryRetryTimeout is the number of seconds between elasticsearch retry queries. +// It can be overriden by OP_RETRY_TIMEOUT env var +var queryRetryTimeout = 3 + // All URLs running on localhost as Kibana is expected to be exposed there const kibanaBaseURL = "http://localhost:5601" func init() { config.Init() + queryRetryTimeout = e2e.GetIntegerFromEnv("OP_RETRY_TIMEOUT", queryRetryTimeout) stackVersion = e2e.GetEnv("OP_STACK_VERSION", stackVersion) } @@ -75,6 +80,8 @@ func IngestManagerFeatureContext(s *godog.Suite) { "minutes": minutesToBeHealthy, }).Error("The Kibana instance could not get the healthy status") } + + imts.StandAlone.RuntimeDependenciesStartDate = time.Now() }) s.BeforeScenario(func(*messages.Pickle) { log.Debug("Before Ingest Manager scenario") diff --git a/e2e/_suites/ingest-manager/stand-alone.go b/e2e/_suites/ingest-manager/stand-alone.go index c98ccb2ce0..275338e349 100644 --- a/e2e/_suites/ingest-manager/stand-alone.go +++ b/e2e/_suites/ingest-manager/stand-alone.go @@ -1,7 +1,11 @@ package main import ( + "context" + "time" + "github.com/cucumber/godog" + "github.com/elastic/e2e-testing/cli/docker" "github.com/elastic/e2e-testing/cli/services" "github.com/elastic/e2e-testing/e2e" log "github.com/sirupsen/logrus" @@ -11,6 +15,10 @@ import ( type StandAloneTestSuite struct { AgentConfigFilePath string Cleanup bool + Hostname string + // date controls for queries + AgentStoppedDate time.Time + RuntimeDependenciesStartDate time.Time } func (sats *StandAloneTestSuite) contributeSteps(s *godog.Suite) { @@ -44,6 +52,13 @@ func (sats *StandAloneTestSuite) aStandaloneAgentIsDeployed() error { return err } + // get container hostname once + hostname, err := getContainerHostname(serviceName) + if err != nil { + return err + } + + sats.Hostname = hostname sats.Cleanup = true if log.IsLevelEnabled(log.DebugLevel) { @@ -66,13 +81,156 @@ func (sats *StandAloneTestSuite) aStandaloneAgentIsDeployed() error { } func (sats *StandAloneTestSuite) thereIsNewDataInTheIndexFromAgent() error { - return godog.ErrPending + maxTimeout := time.Duration(queryRetryTimeout) * time.Minute + minimumHitsCount := 100 + + result, err := searchAgentData(sats.Hostname, sats.RuntimeDependenciesStartDate, minimumHitsCount, maxTimeout) + if err != nil { + return err + } + + log.Debugf("Search result: %v", result) + + return e2e.AssertHitsArePresent(result) } -func (sats *StandAloneTestSuite) theDockerContainerIsStopped(arg1 string) error { - return godog.ErrPending +func (sats *StandAloneTestSuite) theDockerContainerIsStopped(serviceName string) error { + serviceManager := services.NewServiceManager() + + err := serviceManager.RemoveServicesFromCompose("ingest-manager", []string{serviceName}, profileEnv) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + "service": serviceName, + }).Error("Could not stop the service.") + + return err + } + sats.AgentStoppedDate = time.Now() + + return nil } func (sats *StandAloneTestSuite) thereIsNoNewDataInTheIndexAfterAgentShutsDown() error { - return godog.ErrPending + maxTimeout := time.Duration(30) * time.Second + minimumHitsCount := 1 + + result, err := searchAgentData(sats.Hostname, sats.AgentStoppedDate, minimumHitsCount, maxTimeout) + if err != nil { + log.WithFields(log.Fields{ + "error": err, + }).Info("No documents were found for the Agent in the index after it stopped") + return nil + } + + return e2e.AssertHitsAreNotPresent(result) +} + +// we need the container name because we use the Docker Client instead of Docker Compose +func getContainerHostname(serviceName string) (string, error) { + containerName := "ingest-manager_" + serviceName + "_1" + + log.WithFields(log.Fields{ + "service": serviceName, + "containerName": containerName, + }).Debug("Retrieving container name from the Docker client") + + hostname, err := docker.ExecCommandIntoContainer(context.Background(), containerName, "root", []string{"hostname"}) + if err != nil { + log.WithFields(log.Fields{ + "containerName": containerName, + "error": err, + "service": serviceName, + }).Error("Could not retrieve container name from the Docker client") + return "", err + } + + log.WithFields(log.Fields{ + "containerName": containerName, + "hostname": hostname, + "service": serviceName, + }).Info("Hostname retrieved from the Docker client") + + return hostname, nil +} + +func searchAgentData(hostname string, startDate time.Time, minimumHitsCount int, maxTimeout time.Duration) (e2e.SearchResult, error) { + timezone := "America/New_York" + now := time.Now() + + esQuery := map[string]interface{}{ + "version": true, + "size": 500, + "docvalue_fields": []map[string]interface{}{ + { + "field": "@timestamp", + "format": "date_time", + }, + { + "field": "system.process.cpu.start_time", + "format": "date_time", + }, + { + "field": "system.service.state_since", + "format": "date_time", + }, + }, + "_source": map[string]interface{}{ + "excludes": []map[string]interface{}{}, + }, + "query": map[string]interface{}{ + "bool": map[string]interface{}{ + "must": []map[string]interface{}{}, + "filter": []map[string]interface{}{ + { + "bool": map[string]interface{}{ + "filter": []map[string]interface{}{ + { + "bool": map[string]interface{}{ + "should": []map[string]interface{}{ + { + "match_phrase": map[string]interface{}{ + "host.name": hostname, + }, + }, + }, + "minimum_should_match": 1, + }, + }, + { + "bool": map[string]interface{}{ + "should": []map[string]interface{}{ + { + "range": map[string]interface{}{ + "@timestamp": map[string]interface{}{ + "gte": now, + "time_zone": timezone, + }, + }, + }, + }, + "minimum_should_match": 1, + }, + }, + }, + }, + }, + { + "range": map[string]interface{}{ + "@timestamp": map[string]interface{}{ + "gte": startDate, + "format": "strict_date_optional_time", + }, + }, + }, + }, + "should": []map[string]interface{}{}, + "must_not": []map[string]interface{}{}, + }, + }, + } + + indexName := "logs-agent-default" + + return e2e.WaitForNumberOfHits(indexName, esQuery, minimumHitsCount, maxTimeout) } diff --git a/e2e/_suites/metricbeat/metricbeat_test.go b/e2e/_suites/metricbeat/metricbeat_test.go index edfa89fdb6..8a9f134aa4 100644 --- a/e2e/_suites/metricbeat/metricbeat_test.go +++ b/e2e/_suites/metricbeat/metricbeat_test.go @@ -20,10 +20,6 @@ import ( // It can be overriden by OP_METRICBEAT_VERSION env var var metricbeatVersion = "7.7.0" -// queryMaxAttempts is the number of attempts to query elasticsearch before aborting -// It can be overriden by OP_QUERY_MAX_ATTEMPTS env var -var queryMaxAttempts = 5 - // queryRetryTimeout is the number of seconds between elasticsearch retry queries. // It can be overriden by OP_RETRY_TIMEOUT env var var queryRetryTimeout = 3 @@ -38,7 +34,6 @@ func init() { config.Init() metricbeatVersion = e2e.GetEnv("OP_METRICBEAT_VERSION", metricbeatVersion) - queryMaxAttempts = e2e.GetIntegerFromEnv("OP_QUERY_MAX_ATTEMPTS", queryMaxAttempts) queryRetryTimeout = e2e.GetIntegerFromEnv("OP_RETRY_TIMEOUT", queryRetryTimeout) stackVersion = e2e.GetEnv("OP_STACK_VERSION", stackVersion) @@ -216,7 +211,7 @@ func (mts *MetricbeatTestSuite) installedAndConfiguredForModule(serviceType stri // look up configurations under workspace's configurations directory dir, _ := os.Getwd() mts.configurationFile = path.Join(dir, "configurations", mts.ServiceName+".yml") - os.Chmod(mts.configurationFile, 0666) + _ = os.Chmod(mts.configurationFile, 0666) mts.setEventModule(mts.ServiceType) mts.setServiceVersion(mts.Version) @@ -403,12 +398,25 @@ func (mts *MetricbeatTestSuite) thereAreEventsInTheIndex() error { }, } - result, err := e2e.RetrySearch(mts.getIndexName(), esQuery, queryMaxAttempts, queryRetryTimeout) + minimumHitsCount := 5 + maxTimeout := time.Duration(queryRetryTimeout) * time.Minute + + result, err := e2e.WaitForNumberOfHits(mts.getIndexName(), esQuery, minimumHitsCount, maxTimeout) if err != nil { return err } - return e2e.AssertHitsArePresent(result, mts.Query) + err = e2e.AssertHitsArePresent(result) + if err != nil { + log.WithFields(log.Fields{ + "eventModule": mts.Query.EventModule, + "index": mts.Query.IndexName, + "serviceVersion": mts.Query.ServiceVersion, + }).Error(err.Error()) + return err + } + + return nil } func (mts *MetricbeatTestSuite) thereAreNoErrorsInTheIndex() error { @@ -426,7 +434,10 @@ func (mts *MetricbeatTestSuite) thereAreNoErrorsInTheIndex() error { }, } - result, err := e2e.RetrySearch(mts.getIndexName(), esQuery, queryMaxAttempts, queryRetryTimeout) + minimumHitsCount := 5 + maxTimeout := time.Duration(queryRetryTimeout) * time.Minute + + result, err := e2e.WaitForNumberOfHits(mts.getIndexName(), esQuery, minimumHitsCount, maxTimeout) if err != nil { return err } diff --git a/e2e/assertions.go b/e2e/assertions.go index e45be707b4..b121f8e100 100644 --- a/e2e/assertions.go +++ b/e2e/assertions.go @@ -7,12 +7,19 @@ import ( ) // AssertHitsArePresent returns an error if no hits are present -func AssertHitsArePresent(hits map[string]interface{}, q ElasticsearchQuery) error { - hitsCount := len(hits["hits"].(map[string]interface{})["hits"].([]interface{})) - if hitsCount == 0 { - return fmt.Errorf( - "There aren't documents for %s-%s on Metricbeat index %s", - q.EventModule, q.ServiceVersion, q.IndexName) +func AssertHitsArePresent(hits map[string]interface{}) error { + if getHitsCount(hits) == 0 { + return fmt.Errorf("There aren't documents in the index") + } + + return nil +} + +// AssertHitsAreNotPresent returns an error if hits are present +func AssertHitsAreNotPresent(hits map[string]interface{}) error { + count := getHitsCount(hits) + if count != 0 { + return fmt.Errorf("There are %d documents in the index", count) } return nil @@ -39,3 +46,7 @@ func AssertHitsDoNotContainErrors(hits map[string]interface{}, q ElasticsearchQu return nil } + +func getHitsCount(hits map[string]interface{}) int { + return len(hits["hits"].(map[string]interface{})["hits"].([]interface{})) +} diff --git a/e2e/elasticsearch.go b/e2e/elasticsearch.go index e15f96c139..33c4822eb1 100644 --- a/e2e/elasticsearch.go +++ b/e2e/elasticsearch.go @@ -83,6 +83,8 @@ func getElasticsearchClientFromHostPort(host string, port int) (*es.Client, erro cfg := es.Config{ Addresses: []string{fmt.Sprintf("http://%s:%d", host, port)}, + Username: "elastic", + Password: "changeme", } esClient, err := es.NewClient(cfg) if err != nil { @@ -98,7 +100,6 @@ func getElasticsearchClientFromHostPort(host string, port int) (*es.Client, erro } // RetrySearch executes a query over an inddex, with retry options -// maxAttempts could be redefined in the OP_QUERY_MAX_ATTEMPTS environment variable func RetrySearch(indexName string, esQuery map[string]interface{}, maxAttempts int, retryTimeout int) (SearchResult, error) { totalRetryTime := maxAttempts * retryTimeout @@ -256,3 +257,48 @@ func WaitForElasticsearchFromHostPort(host string, port int, maxTimeoutMinutes t return true, nil } + +// WaitForNumberOfHits waits for an elasticsearch query to return more than a number of hits, +// returning false if the query does not reach that number in a defined number of time. +func WaitForNumberOfHits(indexName string, query map[string]interface{}, desiredHits int, maxTimeout time.Duration) (SearchResult, error) { + exp := getExponentialBackOff(maxTimeout) + + retryCount := 1 + result := SearchResult{} + + numberOfHits := func() error { + hits, err := search(indexName, query) + if err != nil { + return err + } + + hitsCount := len(hits["hits"].(map[string]interface{})["hits"].([]interface{})) + if hitsCount < desiredHits { + log.WithFields(log.Fields{ + "currentHits": hitsCount, + "desiredHits": desiredHits, + "elapsedTime": exp.GetElapsedTime(), + "index": indexName, + "retry": retryCount, + }).Warn("Waiting for more hits in the index") + + retryCount++ + + return fmt.Errorf("Not enough hits in the index yet") + } + + result = hits + + log.WithFields(log.Fields{ + "currentHits": hitsCount, + "desiredHits": desiredHits, + "retries": retryCount, + "elapsedTime": exp.GetElapsedTime(), + }).Info("Hits number satisfied") + + return nil + } + + err := backoff.Retry(numberOfHits, exp) + return result, err +} diff --git a/e2e/utils.go b/e2e/utils.go index bfb36f1867..939a805eb0 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -138,7 +138,7 @@ func DownloadFile(url string) (string, error) { return filepath, err } - os.Chmod(tempFile.Name(), 0666) + _ = os.Chmod(tempFile.Name(), 0666) return filepath, nil }