diff --git a/dev-tools/jenkins_ci.ps1 b/dev-tools/jenkins_ci.ps1 index 08614ea1e30c..0a4707ea3716 100755 --- a/dev-tools/jenkins_ci.ps1 +++ b/dev-tools/jenkins_ci.ps1 @@ -28,10 +28,6 @@ $env:RACE_DETECTOR = "true" # Install mage from vendor. exec { go install github.com/elastic/beats/vendor/github.com/magefile/mage } -echo "Fetching testing dependencies" -# TODO (elastic/beats#5050): Use a vendored copy of this. -exec { go get github.com/docker/libcompose } - if (Test-Path "$env:beat") { cd "$env:beat" } else { diff --git a/libbeat/scripts/Makefile b/libbeat/scripts/Makefile index c876387cc945..8092919fd7e3 100755 --- a/libbeat/scripts/Makefile +++ b/libbeat/scripts/Makefile @@ -45,7 +45,6 @@ COVERAGE_DIR?=${BUILD_DIR}/coverage COVERAGE_TOOL?=${BEAT_GOPATH}/bin/gotestcover COVERAGE_TOOL_REPO?=github.com/elastic/beats/vendor/github.com/pierrre/gotestcover TESTIFY_TOOL_REPO?=github.com/elastic/beats/vendor/github.com/stretchr/testify/assert -LIBCOMPOSE_TOOL_REPO?=github.com/docker/libcompose NOW=$(shell date -u '+%Y-%m-%dT%H:%M:%SZ') GOBUILD_FLAGS?=-i -ldflags "-X github.com/elastic/beats/libbeat/version.buildTime=$(NOW) -X github.com/elastic/beats/libbeat/version.commit=$(COMMIT_ID)" GOIMPORTS=goimports @@ -172,8 +171,6 @@ prepare-tests: go get ${COVERAGE_TOOL_REPO} # testify is needed for unit and integration tests go get ${TESTIFY_TOOL_REPO} - # libcompose is needed for integration tests - go get ${LIBCOMPOSE_TOOL_REPO} .PHONY: unit-tests unit-tests: ## @testing Runs the unit tests with coverage. Race is not enabled for unit tests because tests run much slower. diff --git a/libbeat/tests/compose/compose.go b/libbeat/tests/compose/compose.go index 3a8cc4a39105..9cd6dd205a8f 100644 --- a/libbeat/tests/compose/compose.go +++ b/libbeat/tests/compose/compose.go @@ -18,41 +18,13 @@ package compose import ( - "context" "errors" - "fmt" "os" "path/filepath" - "regexp" - "strings" - "testing" - "time" - "strconv" - - "github.com/docker/libcompose/docker" - "github.com/docker/libcompose/docker/ctx" - "github.com/docker/libcompose/project" - "github.com/docker/libcompose/project/options" + "testing" ) -// docker-compose project wrapper -type composeProject struct { - p project.APIProject - file string -} - -type serviceInfo struct { - Name string - Running bool - Healthy bool - // Has been up for too long?: - Old bool -} - -// Regexp matching state to flag container as old -var oldRegexp = regexp.MustCompile("minute") - // EnsureUp starts all the requested services (must be defined in docker-compose.yml) // with a default timeout of 300 seconds func EnsureUp(t *testing.T, services ...string) { @@ -93,169 +65,7 @@ func EnsureUpWithTimeout(t *testing.T, timeout int, services ...string) { } } -// Start the container, unless it's running already -func (c *composeProject) Start(service string) error { - servicesStatus, err := c.getServices(service) - if err != nil { - return err - } - - if servicesStatus[service] != nil { - if servicesStatus[service].Running { - // Someone is running it - return nil - } - } - - c.Lock() - defer c.Unlock() - - return c.p.Up(context.Background(), options.Up{ - Create: options.Create{ - ForceBuild: true, - }, - }, service) -} - -// Ensure all wanted services are healthy. Wait loop (60s timeout) -func (c *composeProject) Wait(seconds int, services ...string) error { - healthy := false - for !healthy && seconds > 0 { - healthy = true - - servicesStatus, err := c.getServices(services...) - if err != nil { - return err - } - - for _, s := range servicesStatus { - if !s.Healthy { - healthy = false - break - } - } - - time.Sleep(1 * time.Second) - seconds-- - } - - if !healthy { - return errors.New("Timeout waiting for services to be healthy") - } - return nil -} - -func (c *composeProject) Kill(service string) error { - c.Lock() - defer c.Unlock() - - return c.p.Kill(context.Background(), "KILL", service) -} - -func (c *composeProject) KillOld(except []string) error { - // Do not kill ourselves ;) - except = append(except, "beat") - - // These services take very long to start up and stop. If they are stopped - // it can happen that an other package tries to start them at the same time - // which leads to a conflict. We need a better solution long term but that should - // solve the problem for now. - except = append(except, "elasticsearch", "kibana", "logstash", "kubernetes") - - servicesStatus, err := c.getServices() - if err != nil { - return err - } - - for _, s := range servicesStatus { - // Ignore the ones we want - if contains(except, s.Name) { - continue - } - - if s.Old { - err = c.Kill(s.Name) - if err != nil { - return err - } - } - } - - return nil -} - -// Lock acquires the lock (300s) timeout -// Normally it should only be seconds that the lock is used, but in some cases it can take longer. -func (c *composeProject) Lock() { - seconds := 300 - for seconds > 0 { - file, err := os.OpenFile(c.file+".lock", os.O_CREATE|os.O_EXCL, 0500) - file.Close() - if err != nil { - fmt.Println("docker-compose.yml is locked, waiting") - time.Sleep(1 * time.Second) - seconds-- - continue - } - return - } - - // This should rarely happen as we lock for start only, less than a second - panic(errors.New("Timeout waiting for lock, please remove docker-compose.yml.lock")) -} - -func (c *composeProject) Unlock() { - os.Remove(c.file + ".lock") -} - -func (c *composeProject) getServices(filter ...string) (map[string]*serviceInfo, error) { - c.Lock() - defer c.Unlock() - - result := make(map[string]*serviceInfo) - services, err := c.p.Ps(context.Background(), filter...) - if err != nil { - return nil, err - } - - containers, err := c.p.Containers(context.Background(), project.Filter{State: project.Running}, filter...) - if err != nil { - return nil, err - } - - for _, c := range services { - name := strings.Split(c["Name"], "_")[1] - // In case of several (stopped) containers, always prefer info about running ones - if result[name] != nil { - if result[name].Running { - continue - } - } - - service := &serviceInfo{ - Name: name, - } - // fill details: - service.Healthy = strings.Contains(c["State"], "(healthy)") - service.Running = contains(containers, c["Id"]) - if service.Healthy { - service.Old = oldRegexp.MatchString(c["State"]) - } - result[name] = service - } - return result, nil -} - -func contains(list []string, item string) bool { - for _, i := range list { - if item == i { - return true - } - } - return false -} - -func getComposeProject() (*composeProject, error) { +func getComposeProject() (*Project, error) { // find docker-compose path, err := os.Getwd() if err != nil { @@ -266,22 +76,17 @@ func getComposeProject() (*composeProject, error) { return nil, errors.New("docker-compose.yml not found") } - if _, err = os.Stat(path + "/docker-compose.yml"); err != nil { + if _, err = os.Stat(filepath.Join(path, "docker-compose.yml")); err != nil { path = filepath.Dir(path) } else { break } } - project, err := docker.NewProject(&ctx.Context{ - Context: project.Context{ - ProjectName: os.Getenv("DOCKER_COMPOSE_PROJECT_NAME"), - ComposeFiles: []string{path + "/docker-compose.yml"}, + return NewProject( + os.Getenv("DOCKER_COMPOSE_PROJECT_NAME"), + []string{ + filepath.Join(path, "docker-compose.yml"), }, - }, nil) - if err != nil { - return nil, err - } - - return &composeProject{project, path + "/docker-compose.yml"}, nil + ) } diff --git a/libbeat/tests/compose/project.go b/libbeat/tests/compose/project.go new file mode 100644 index 000000000000..6df5d3585600 --- /dev/null +++ b/libbeat/tests/compose/project.go @@ -0,0 +1,275 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package compose + +import ( + "context" + "errors" + "os" + "path/filepath" + "time" + + "github.com/elastic/beats/libbeat/logp" +) + +// CreateOptions are the options when containers are created +type CreateOptions struct { + Build bool + ForceRecreate bool +} + +// UpOptions are the options when containers are started +type UpOptions struct { + Create CreateOptions +} + +// Filter options for services +type Filter struct { + State State +} + +// State of a service for filtering +type State string + +// Possible states of a service for filtering +const ( + AnyState = State("") + RunningState = State("running") + StoppedState = State("stopped") +) + +// Driver is the interface of docker compose implementations +type Driver interface { + Up(ctx context.Context, opts UpOptions, service string) error + Kill(ctx context.Context, signal string, service string) error + Ps(ctx context.Context, filter ...string) ([]ContainerStatus, error) + // Containers(ctx context.Context, projectFilter Filter, filter ...string) ([]string, error) + + LockFile() string +} + +// ContainerStatus is an interface to obtain the status of a container +type ContainerStatus interface { + ServiceName() string + Healthy() bool + Running() bool + Old() bool +} + +// Project is a docker-compose project +type Project struct { + Driver +} + +// NewProject creates a new docker-compose project +func NewProject(name string, files []string) (*Project, error) { + if len(files) == 0 { + return nil, errors.New("project needs at least one file") + } + if name == "" { + name = filepath.Base(filepath.Dir(files[0])) + } + return &Project{ + &wrapperDriver{ + Name: name, + Files: files, + }, + }, nil +} + +// Start the container, unless it's running already +func (c *Project) Start(service string) error { + servicesStatus, err := c.getServices(service) + if err != nil { + return err + } + + if servicesStatus[service] != nil { + if servicesStatus[service].Running { + // Someone is running it + return nil + } + } + + c.Lock() + defer c.Unlock() + + return c.Driver.Up(context.Background(), UpOptions{ + Create: CreateOptions{ + Build: true, + ForceRecreate: true, + }, + }, service) +} + +// Wait ensures all wanted services are healthy. Wait loop (60s timeout) +func (c *Project) Wait(seconds int, services ...string) error { + healthy := false + timeout := time.Now().Add(time.Duration(seconds) * time.Second) + for !healthy && time.Now().Before(timeout) { + healthy = true + + servicesStatus, err := c.getServices(services...) + if err != nil { + return err + } + + if len(servicesStatus) == 0 { + healthy = false + } + + for _, s := range servicesStatus { + if !s.Healthy { + healthy = false + break + } + } + + time.Sleep(1 * time.Second) + } + + if !healthy { + return errors.New("Timeout waiting for services to be healthy") + } + return nil +} + +// Kill a container +func (c *Project) Kill(service string) error { + c.Lock() + defer c.Unlock() + + return c.Driver.Kill(context.Background(), "KILL", service) +} + +// KillOld kills old containers +func (c *Project) KillOld(except []string) error { + // Do not kill ourselves ;) + except = append(except, "beat") + + // These services take very long to start up and stop. If they are stopped + // it can happen that an other package tries to start them at the same time + // which leads to a conflict. We need a better solution long term but that should + // solve the problem for now. + except = append(except, "elasticsearch", "kibana", "logstash", "kubernetes") + + servicesStatus, err := c.getServices() + if err != nil { + return err + } + + for _, s := range servicesStatus { + // Ignore the ones we want + if contains(except, s.Name) { + continue + } + + if s.Old { + err = c.Kill(s.Name) + if err != nil { + return err + } + } + } + + return nil +} + +// Lock acquires the lock (300s) timeout +// Normally it should only be seconds that the lock is used, but in some cases it can take longer. +func (c *Project) Lock() { + timeout := time.Now().Add(300 * time.Second) + infoShown := false + for time.Now().Before(timeout) { + file, err := os.OpenFile(c.LockFile(), os.O_CREATE|os.O_EXCL, 0500) + file.Close() + if err != nil { + if !infoShown { + logp.Info("docker-compose.yml is locked, waiting") + infoShown = true + } + time.Sleep(1 * time.Second) + continue + } + if infoShown { + logp.Info("docker-compose.yml lock acquired") + } + return + } + + // This should rarely happen as we lock for start only, less than a second + panic(errors.New("Timeout waiting for lock, please remove docker-compose.yml.lock")) +} + +// Unlock releases the project lock +func (c *Project) Unlock() { + os.Remove(c.LockFile()) +} + +type serviceInfo struct { + Name string + Running bool + Healthy bool + + // Has been up for too long?: + Old bool +} + +func (c *Project) getServices(filter ...string) (map[string]*serviceInfo, error) { + c.Lock() + defer c.Unlock() + + result := make(map[string]*serviceInfo) + services, err := c.Driver.Ps(context.Background(), filter...) + if err != nil { + return nil, err + } + + for _, c := range services { + name := c.ServiceName() + + // In case of several (stopped) containers, always prefer info about running ones + if result[name] != nil { + if result[name].Running { + continue + } + } + + service := &serviceInfo{ + Name: name, + } + // fill details: + service.Healthy = c.Healthy() + service.Running = c.Running() + if service.Healthy { + service.Old = c.Old() + } + result[name] = service + } + + return result, nil +} + +func contains(list []string, item string) bool { + for _, i := range list { + if item == i { + return true + } + } + return false +} diff --git a/libbeat/tests/compose/wrapper.go b/libbeat/tests/compose/wrapper.go new file mode 100644 index 000000000000..f8ae629584a7 --- /dev/null +++ b/libbeat/tests/compose/wrapper.go @@ -0,0 +1,192 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package compose + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + + "github.com/pkg/errors" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/client" +) + +const ( + labelComposeService = "com.docker.compose.service" + labelComposeProject = "com.docker.compose.project" +) + +type wrapperDriver struct { + Name string + Files []string +} + +type wrapperContainer struct { + info types.Container +} + +func (c *wrapperContainer) ServiceName() string { + return c.info.Labels[labelComposeService] +} + +func (c *wrapperContainer) Healthy() bool { + return strings.Contains(c.info.Status, "(healthy)") +} + +func (c *wrapperContainer) Running() bool { + return c.info.State == "running" +} + +func (c *wrapperContainer) Old() bool { + return strings.Contains(c.info.Status, "minute") +} + +func (d *wrapperDriver) LockFile() string { + return d.Files[0] + ".lock" +} + +func (d *wrapperDriver) cmd(ctx context.Context, command string, arg ...string) *exec.Cmd { + var args []string + args = append(args, "--project-name", d.Name) + for _, f := range d.Files { + args = append(args, "--file", f) + } + args = append(args, command) + args = append(args, arg...) + cmd := exec.CommandContext(ctx, "docker-compose", args...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + return cmd +} + +func (d *wrapperDriver) Up(ctx context.Context, opts UpOptions, service string) error { + var args []string + + args = append(args, "-d") + + if opts.Create.Build { + args = append(args, "--build") + } + + if opts.Create.ForceRecreate { + args = append(args, "--force-recreate") + } + + if service != "" { + args = append(args, service) + } + + return d.cmd(ctx, "up", args...).Run() +} + +func (d *wrapperDriver) Kill(ctx context.Context, signal string, service string) error { + var args []string + + if signal != "" { + args = append(args, "-s", signal) + } + + if service != "" { + args = append(args, service) + } + + return d.cmd(ctx, "kill", args...).Run() +} + +func (d *wrapperDriver) Ps(ctx context.Context, filter ...string) ([]ContainerStatus, error) { + containers, err := d.containers(ctx, Filter{State: AnyState}, filter...) + if err != nil { + return nil, errors.Wrap(err, "ps") + } + + ps := make([]ContainerStatus, len(containers)) + for i, c := range containers { + ps[i] = &wrapperContainer{info: c} + } + return ps, nil +} + +func (d *wrapperDriver) Containers(ctx context.Context, projectFilter Filter, filter ...string) ([]string, error) { + containers, err := d.containers(ctx, projectFilter, filter...) + if err != nil { + return nil, errors.Wrap(err, "containers") + } + + ids := make([]string, len(containers)) + for i := range containers { + ids[i] = containers[i].ID + } + return ids, nil +} + +func (d *wrapperDriver) containers(ctx context.Context, projectFilter Filter, filter ...string) ([]types.Container, error) { + cli, err := client.NewEnvClient() + if err != nil { + return nil, errors.Wrap(err, "failed to start docker client") + } + + var serviceFilters []filters.Args + if len(filter) == 0 { + f := makeFilter(d.Name, "", projectFilter) + serviceFilters = append(serviceFilters, f) + } else { + for _, service := range filter { + f := makeFilter(d.Name, service, projectFilter) + serviceFilters = append(serviceFilters, f) + } + } + + var containers []types.Container + for _, f := range serviceFilters { + c, err := cli.ContainerList(ctx, types.ContainerListOptions{ + All: true, + Filters: f, + }) + if err != nil { + return nil, errors.Wrap(err, "failed to get container list") + } + containers = append(containers, c...) + } + + return containers, nil +} + +func makeFilter(project, service string, projectFilter Filter) filters.Args { + f := filters.NewArgs() + f.Add("label", fmt.Sprintf("%s=%s", labelComposeProject, project)) + + if service != "" { + f.Add("label", fmt.Sprintf("%s=%s", labelComposeService, service)) + } + + switch projectFilter.State { + case AnyState: + // No filter + case RunningState: + f.Add("status", "running") + case StoppedState: + f.Add("status", "exited") + } + + return f +} diff --git a/libbeat/tests/system/beat/compose.py b/libbeat/tests/system/beat/compose.py index 39cf4249f020..54387f006df7 100644 --- a/libbeat/tests/system/beat/compose.py +++ b/libbeat/tests/system/beat/compose.py @@ -9,6 +9,7 @@ if INTEGRATION_TESTS: from compose.cli.command import get_project from compose.service import BuildAction + from compose.service import ConvergenceStrategy class ComposeMixin(object): @@ -46,6 +47,7 @@ def is_healthy(container): project = cls.compose_project() project.up( + strategy=ConvergenceStrategy.always, service_names=cls.COMPOSE_SERVICES, do_build=BuildAction.force, timeout=30) diff --git a/metricbeat/Dockerfile b/metricbeat/Dockerfile index c2d6ffcae2eb..8ab5e06e414e 100644 --- a/metricbeat/Dockerfile +++ b/metricbeat/Dockerfile @@ -7,7 +7,9 @@ RUN set -x && \ netcat python-pip virtualenv && \ apt-get clean +RUN pip install --upgrade pip RUN pip install --upgrade setuptools +RUN pip install --upgrade docker-compose==1.21.0 # Setup work environment ENV METRICBEAT_PATH /go/src/github.com/elastic/beats/metricbeat