diff --git a/e2e/_suites/fleet/features/backend_processes.feature b/e2e/_suites/fleet/features/backend_processes.feature index 35df13c32e..151d5b04fb 100644 --- a/e2e/_suites/fleet/features/backend_processes.feature +++ b/e2e/_suites/fleet/features/backend_processes.feature @@ -6,8 +6,8 @@ Feature: Backend Processes Scenario Outline: Deploying the agent Given a "" agent is deployed to Fleet with "tar" installer When the "elastic-agent" process is in the "started" state on the host - Then the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + Then there are "2" instances of the "filebeat" process in the "started" state + And there are "2" instances of the "metricbeat" process in the "started" state @centos Examples: Centos @@ -23,8 +23,8 @@ Examples: Debian Scenario Outline: Deploying the agent with enroll and then run on rpm and deb Given a "" agent is deployed to Fleet with "systemd" installer When the "elastic-agent" process is in the "started" state on the host - Then the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + Then there are "2" instances of the "filebeat" process in the "started" state + And there are "2" instances of the "metricbeat" process in the "started" state @centos Examples: Centos @@ -57,8 +57,8 @@ Examples: Debian Scenario Outline: Restarting the installed agent Given a "" agent is deployed to Fleet with "tar" installer When the "elastic-agent" process is "restarted" on the host - Then the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + Then there are "2" instances of the "filebeat" process in the "started" state + And there are "2" instances of the "metricbeat" process in the "started" state @centos Examples: Centos @@ -75,8 +75,8 @@ Scenario Outline: Restarting the host with persistent agent restarts backen Given a "" agent is deployed to Fleet with "tar" installer When the host is restarted Then the "elastic-agent" process is in the "started" state on the host - And the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + And there are "2" instances of the "filebeat" process in the "started" state + And there are "2" instances of the "metricbeat" process in the "started" state @centos Examples: Centos diff --git a/e2e/_suites/fleet/features/stand_alone_agent.feature b/e2e/_suites/fleet/features/stand_alone_agent.feature index 20a44ce177..639a21a676 100644 --- a/e2e/_suites/fleet/features/stand_alone_agent.feature +++ b/e2e/_suites/fleet/features/stand_alone_agent.feature @@ -7,8 +7,8 @@ Feature: Stand-alone Agent @start-agent Scenario Outline: Starting the agent starts backend processes When a "" stand-alone agent is deployed - Then the "filebeat" process is in the "started" state on the host - And the "metricbeat" process is in the "started" state on the host + Then there are "1" instances of the "filebeat" process in the "started" state + And there are "2" instances of the "metricbeat" process in the "started" state @default Examples: default diff --git a/e2e/_suites/fleet/fleet.go b/e2e/_suites/fleet/fleet.go index 891ed275c3..595eb1ad7d 100644 --- a/e2e/_suites/fleet/fleet.go +++ b/e2e/_suites/fleet/fleet.go @@ -441,7 +441,7 @@ func (fts *FleetTestSuite) processStateChangedOnTheHost(process string, state st containerName := fts.getContainerName(agentInstaller, 1) - return docker.CheckProcessStateOnTheHost(containerName, process, "stopped", common.TimeoutFactor) + return docker.CheckProcessStateOnTheHost(containerName, process, "stopped", 1, common.TimeoutFactor) } func (fts *FleetTestSuite) setup() error { @@ -543,7 +543,7 @@ func (fts *FleetTestSuite) theFileSystemAgentFolderIsEmpty() error { return err } - if strings.Contains(content, "No such file or directory") { + if content == "" || strings.Contains(content, "No such file or directory") { return nil } diff --git a/e2e/_suites/fleet/ingest_manager_test.go b/e2e/_suites/fleet/ingest_manager_test.go index b665a2e1f7..806e203a5c 100644 --- a/e2e/_suites/fleet/ingest_manager_test.go +++ b/e2e/_suites/fleet/ingest_manager_test.go @@ -104,6 +104,7 @@ func InitializeIngestManagerTestScenario(ctx *godog.ScenarioContext) { }) ctx.Step(`^the "([^"]*)" process is in the "([^"]*)" state on the host$`, imts.processStateOnTheHost) + ctx.Step(`^there are "([^"]*)" instances of the "([^"]*)" process in the "([^"]*)" state$`, imts.thereAreInstancesOfTheProcessInTheState) imts.Fleet.contributeSteps(ctx) } diff --git a/e2e/_suites/fleet/world.go b/e2e/_suites/fleet/world.go index 27cc74345f..132f56f575 100644 --- a/e2e/_suites/fleet/world.go +++ b/e2e/_suites/fleet/world.go @@ -6,6 +6,7 @@ package main import ( "fmt" + "strconv" "github.com/elastic/e2e-testing/internal/common" "github.com/elastic/e2e-testing/internal/docker" @@ -17,6 +18,10 @@ type IngestManagerTestSuite struct { } func (imts *IngestManagerTestSuite) processStateOnTheHost(process string, state string) error { + return imts.thereAreInstancesOfTheProcessInTheState("1", process, state) +} + +func (imts *IngestManagerTestSuite) thereAreInstancesOfTheProcessInTheState(ocurrences string, process string, state string) error { profile := common.FleetProfileName var containerName string @@ -28,5 +33,10 @@ func (imts *IngestManagerTestSuite) processStateOnTheHost(process string, state containerName = imts.Fleet.getContainerName(agentInstaller, 1) } - return docker.CheckProcessStateOnTheHost(containerName, process, state, common.TimeoutFactor) + count, err := strconv.Atoi(ocurrences) + if err != nil { + return err + } + + return docker.CheckProcessStateOnTheHost(containerName, process, state, count, common.TimeoutFactor) } diff --git a/internal/docker/docker.go b/internal/docker/docker.go index 213d0237a8..c77ab9b42d 100644 --- a/internal/docker/docker.go +++ b/internal/docker/docker.go @@ -22,6 +22,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" + "github.com/docker/docker/pkg/stdcopy" "github.com/elastic/e2e-testing/internal/common" log "github.com/sirupsen/logrus" ) @@ -31,6 +32,12 @@ var instance *client.Client // OPNetworkName name of the network used by the tool const OPNetworkName = "elastic-dev-network" +type execResult struct { + StdOut string + StdErr string + ExitCode int +} + func buildTarForDeployment(file *os.File) (bytes.Buffer, error) { fileInfo, _ := file.Stat() @@ -66,10 +73,10 @@ func buildTarForDeployment(file *os.File) (bytes.Buffer, error) { // we are using the Docker client instead of docker-compose // because it does not support returning the output of a // command: it simply returns error level -func CheckProcessStateOnTheHost(containerName string, process string, state string, timeoutFactor int) error { +func CheckProcessStateOnTheHost(containerName string, process string, state string, occurrences int, timeoutFactor int) error { timeout := time.Duration(common.TimeoutFactor) * time.Minute - err := WaitForProcess(containerName, process, state, timeout) + err := WaitForProcess(containerName, process, state, occurrences, timeout) if err != nil { if state == "started" { log.WithFields(log.Fields{ @@ -226,8 +233,31 @@ func ExecCommandIntoContainerWithEnv(ctx context.Context, containerName string, } defer resp.Close() - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(resp.Reader) + // see https://stackoverflow.com/a/57132902 + var execRes execResult + + // read the output + var outBuf, errBuf bytes.Buffer + outputDone := make(chan error) + + go func() { + // StdCopy demultiplexes the stream into two buffers + _, err = stdcopy.StdCopy(&outBuf, &errBuf, resp.Reader) + outputDone <- err + }() + + select { + case err := <-outputDone: + if err != nil { + return "", err + } + break + + case <-ctx.Done(): + return "", ctx.Err() + } + + stdout, err := ioutil.ReadAll(&outBuf) if err != nil { log.WithFields(log.Fields{ "container": containerName, @@ -236,35 +266,28 @@ func ExecCommandIntoContainerWithEnv(ctx context.Context, containerName string, "env": env, "error": err, "tty": tty, - }).Error("Could not parse command output from container") + }).Error("Could not parse stdout from container") return "", err } - output := buf.String() - - log.WithFields(log.Fields{ - "container": containerName, - "command": cmd, - "detach": detach, - "env": env, - "tty": tty, - }).Trace("Command sucessfully executed in container") - - output = strings.ReplaceAll(output, "\n", "") - - patterns := []string{ - "\x01\x00\x00\x00\x00\x00\x00\r", - "\x01\x00\x00\x00\x00\x00\x00)", - } - for _, pattern := range patterns { - if strings.HasPrefix(output, pattern) { - output = strings.ReplaceAll(output, pattern, "") - log.WithFields(log.Fields{ - "output": output, - }).Trace("Output name has been sanitized") - } + stderr, err := ioutil.ReadAll(&errBuf) + if err != nil { + log.WithFields(log.Fields{ + "container": containerName, + "command": cmd, + "detach": detach, + "env": env, + "error": err, + "tty": tty, + }).Error("Could not parse stderr from container") + return "", err } - return output, nil + execRes.ExitCode = 0 + execRes.StdOut = string(stdout) + execRes.StdErr = string(stderr) + + // remove '\n' from the response + return strings.ReplaceAll(execRes.StdOut, "\n", ""), nil } // GetContainerHostname we need the container name because we use the Docker Client instead of Docker Compose @@ -445,7 +468,7 @@ func RemoveDevNetwork() error { // WaitForProcess polls a container executing "ps" command until the process is in the desired state (present or not), // or a timeout happens -func WaitForProcess(containerName string, process string, desiredState string, maxTimeout time.Duration) error { +func WaitForProcess(containerName string, process string, desiredState string, ocurrences int, maxTimeout time.Duration) error { exp := common.GetExponentialBackOff(maxTimeout) mustBePresent := false @@ -457,49 +480,125 @@ func WaitForProcess(containerName string, process string, desiredState string, m processStatus := func() error { log.WithFields(log.Fields{ "desiredState": desiredState, + "ocurrences": ocurrences, "process": process, }).Trace("Checking process desired state on the container") - output, err := ExecCommandIntoContainer(context.Background(), containerName, "root", []string{"pgrep", "-n", "-l", process}) + // pgrep -d: -d, --delimiter specify output delimiter + //i.e. "pgrep -d , metricbeat": 483,519 + cmds := []string{"pgrep", "-d", ",", process} + output, err := ExecCommandIntoContainer(context.Background(), containerName, "root", cmds) if err != nil { log.WithFields(log.Fields{ + "cmds": cmds, "desiredState": desiredState, "elapsedTime": exp.GetElapsedTime(), "error": err, "container": containerName, "mustBePresent": mustBePresent, + "ocurrences": ocurrences, "process": process, "retry": retryCount, - }).Warn("Could not execute 'pgrep -n -l' in the container") + }).Warn("Could not get number of processes in the container") retryCount++ return err } - outputContainsProcess := strings.Contains(output, process) + // tokenize the pids to get each pid's state, adding them to an array if they match the desired state + // From Split docs: + // If output does not contain sep and sep is not empty, Split returns a + // slice of length 1 whose only element is s, that's why we first initialise to the empty array + pids := strings.Split(output, ",") + if len(pids) == 1 && pids[0] == "" { + pids = []string{} + } + + log.WithFields(log.Fields{ + "count": len(pids), + "desiredState": desiredState, + "mustBePresent": mustBePresent, + "pids": pids, + "process": process, + }).Tracef("Pids for process found") + + desiredStatePids := []string{} + + for _, pid := range pids { + pidStateCmds := []string{"ps", "-q", pid, "-o", "state", "--no-headers"} + pidState, err := ExecCommandIntoContainer(context.Background(), containerName, "root", pidStateCmds) + if err != nil { + log.WithFields(log.Fields{ + "cmds": cmds, + "desiredState": desiredState, + "elapsedTime": exp.GetElapsedTime(), + "error": err, + "container": containerName, + "mustBePresent": mustBePresent, + "ocurrences": ocurrences, + "pid": pid, + "process": process, + "retry": retryCount, + }).Warn("Could not check pid status in the container") + + retryCount++ + + return err + } - // both true or both false - if mustBePresent == outputContainsProcess { log.WithFields(log.Fields{ "desiredState": desiredState, - "container": containerName, "mustBePresent": mustBePresent, + "pid": pid, + "pidState": pidState, "process": process, + }).Tracef("Checking if process is in the S state") + + // if the process must be present, then check for the S state + // From 'man ps': + // D uninterruptible sleep (usually IO) + // R running or runnable (on run queue) + // S interruptible sleep (waiting for an event to complete) + // T stopped by job control signal + // t stopped by debugger during the tracing + // W paging (not valid since the 2.6.xx kernel) + // X dead (should never be seen) + // Z defunct ("zombie") process, terminated but not reaped by its parent + if mustBePresent && pidState == "S" { + desiredStatePids = append(desiredStatePids, pid) + } else if !mustBePresent { + desiredStatePids = append(desiredStatePids, pid) + } + } + + occurrencesMatched := (len(desiredStatePids) == ocurrences) + + // both true or both false + if mustBePresent == occurrencesMatched { + log.WithFields(log.Fields{ + "desiredOcurrences": ocurrences, + "desiredState": desiredState, + "container": containerName, + "mustBePresent": mustBePresent, + "ocurrences": len(desiredStatePids), + "process": process, }).Infof("Process desired state checked") return nil } if mustBePresent { - err = fmt.Errorf("%s process is not running in the container yet", process) + err = fmt.Errorf("%s process is not running in the container with the desired number of occurrences (%d) yet", process, ocurrences) log.WithFields(log.Fields{ - "desiredState": desiredState, - "elapsedTime": exp.GetElapsedTime(), - "error": err, - "container": containerName, - "process": process, - "retry": retryCount, + "desiredOcurrences": ocurrences, + "desiredState": desiredState, + "elapsedTime": exp.GetElapsedTime(), + "error": err, + "container": containerName, + "ocurrences": len(desiredStatePids), + "process": process, + "retry": retryCount, }).Warn(err.Error()) retryCount++ @@ -509,12 +608,14 @@ func WaitForProcess(containerName string, process string, desiredState string, m err = fmt.Errorf("%s process is still running in the container", process) log.WithFields(log.Fields{ - "elapsedTime": exp.GetElapsedTime(), - "error": err, - "container": containerName, - "process": process, - "state": desiredState, - "retry": retryCount, + "desiredOcurrences": ocurrences, + "elapsedTime": exp.GetElapsedTime(), + "error": err, + "container": containerName, + "ocurrences": len(desiredStatePids), + "process": process, + "state": desiredState, + "retry": retryCount, }).Warn(err.Error()) retryCount++