From e2a3eef349cb6643c08a7840d8cbd43b38fedfd5 Mon Sep 17 00:00:00 2001 From: Pedro Larroy <928489+larroy@users.noreply.github.com> Date: Tue, 28 Aug 2018 21:16:31 +0200 Subject: [PATCH] A solution to prevent zombie containers locally and in CI (#12381) Fix pylint, mypy, and pycharm code inspection warnings --- ci/README.md | 14 +++ ci/build.py | 304 ++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 243 insertions(+), 75 deletions(-) diff --git a/ci/README.md b/ci/README.md index 548e9cb9b042..693087569434 100644 --- a/ci/README.md +++ b/ci/README.md @@ -59,6 +59,20 @@ To work inside a container with a shell you can do: When building, the artifacts are located in the build/ directory in the project root. In case `build.py -a` is invoked, the artifacts are located in build./ +# Docker container cleanup (Zombie containers) +Docker has a client-server architecture, so when the program that is executing the docker client +dies or receieves a signal, the container keeps running as it's started by the docker daemon. +We implement signal handlers that catch sigterm and sigint and cleanup containers before exit. In +Jenkins there's not enough time between sigterm and sigkill so we guarantee that containers are not +left running by propagating environment variables used by the Jenkins process tree killer to +identify which process to kill when the job is stopped. This has the effect of stopping the +container given that the process inside the container is terminated. + +How to test this is working propperly: On the console you can hit ^C while a container is running +(not just building) and see that the container is stopped by running `docker ps` on another +terminal. In Jenkins this has been tested by stopping the job which has containers running and +verifying that the container stops shortly afterwards by running docker ps. + ## Add a platform To add a platform, you should add the appropriate dockerfile in diff --git a/ci/build.py b/ci/build.py index f1a5e99e2d0e..df9e97bdb5fd 100755 --- a/ci/build.py +++ b/ci/build.py @@ -23,26 +23,67 @@ """ __author__ = 'Marco de Abreu, Kellen Sunderland, Anton Chernov, Pedro Larroy' -__version__ = '0.2' +__version__ = '0.3' import argparse import glob import logging +import os import re import shutil import subprocess import sys import tempfile -from copy import deepcopy from itertools import chain -from subprocess import call, check_call, check_output +from subprocess import check_call, check_output from typing import * from util import * +import docker +import docker.models +import docker.errors +import signal +import atexit import pprint -import requests -CCACHE_MAXSIZE = '500G' +class Cleanup: + """A class to cleanup containers""" + def __init__(self): + self.containers = set() + self.docker_stop_timeout = 3 + + def add_container(self, container: docker.models.containers.Container): + assert isinstance(container, docker.models.containers.Container) + self.containers.add(container) + + def remove_container(self, container: docker.models.containers.Container): + assert isinstance(container, docker.models.containers.Container) + self.containers.remove(container) + + def _cleanup_containers(self): + if self.containers: + logging.warning("Cleaning up containers") + else: + return + # noinspection PyBroadException + try: + stop_timeout = int(os.environ.get("DOCKER_STOP_TIMEOUT", self.docker_stop_timeout)) + except Exception: + stop_timeout = 3 + for container in self.containers: + try: + container.stop(timeout=stop_timeout) + logging.info("☠: stopped container %s", trim_container_id(container.id)) + container.remove() + logging.info("🚽: removed container %s", trim_container_id(container.id)) + except Exception as e: + logging.exception(e) + self.containers.clear() + logging.info("Cleaning up containers finished.") + + def __call__(self): + """Perform cleanup""" + self._cleanup_containers() def get_dockerfiles_path(): @@ -115,7 +156,10 @@ def run_cmd(): run_cmd() # Get image id by reading the tag. It's guaranteed (except race condition) that the tag exists. Otherwise, the # check_call would have failed - return _get_local_image_id(docker_binary=docker_binary, docker_tag=tag) + image_id = _get_local_image_id(docker_binary=docker_binary, docker_tag=tag) + if not image_id: + raise FileNotFoundError('Unable to find docker image id matching with {}'.format(tag)) + return image_id def _get_local_image_id(docker_binary, docker_tag): @@ -137,10 +181,11 @@ def buildir() -> str: def default_ccache_dir() -> str: + """:return: ccache directory for the current platform""" # Share ccache across containers if 'CCACHE_DIR' in os.environ: + ccache_dir = os.path.realpath(os.environ['CCACHE_DIR']) try: - ccache_dir = os.path.realpath(os.environ['CCACHE_DIR']) os.makedirs(ccache_dir, exist_ok=True) return ccache_dir except PermissionError: @@ -154,14 +199,41 @@ def default_ccache_dir() -> str: return os.path.join(tempfile.gettempdir(), "ci_ccache") +def trim_container_id(cid): + """:return: trimmed container id""" + return cid[:12] + + def container_run(platform: str, - docker_binary: str, + nvidia_runtime: bool, docker_registry: str, shared_memory_size: str, local_ccache_dir: str, command: List[str], - dry_run: bool = False, - interactive: bool = False) -> int: + cleanup: Cleanup, + dry_run: bool = False) -> int: + """Run command in a container""" + container_wait_s = 600 + # + # Environment setup + # + environment = { + 'CCACHE_MAXSIZE': '500G', + 'CCACHE_TEMPDIR': '/tmp/ccache', # temp dir should be local and not shared + 'CCACHE_DIR': '/work/ccache', # this path is inside the container as /work/ccache is + # mounted + 'CCACHE_LOGFILE': '/tmp/ccache.log', # a container-scoped log, useful for ccache + # verification. + } + # These variables are passed to the container to the process tree killer can find runaway + # process inside the container + # https://wiki.jenkins.io/display/JENKINS/ProcessTreeKiller + # https://github.com/jenkinsci/jenkins/blob/578d6bacb33a5e99f149de504c80275796f0b231/core/src/main/java/hudson/model/Run.java#L2393 + # + jenkins_env_vars = ['BUILD_NUMBER', 'BUILD_ID', 'BUILD_TAG'] + environment.update({k: os.environ[k] for k in jenkins_env_vars if k in os.environ}) + environment.update({k: os.environ[k] for k in ['CCACHE_MAXSIZE'] if k in os.environ}) + tag = get_docker_tag(platform=platform, registry=docker_registry) mx_root = get_mxnet_root() local_build_folder = buildir() @@ -169,39 +241,107 @@ def container_run(platform: str, os.makedirs(local_build_folder, exist_ok=True) os.makedirs(local_ccache_dir, exist_ok=True) logging.info("Using ccache directory: %s", local_ccache_dir) - runlist = [docker_binary, 'run', '--rm', '-t', - '--shm-size={}'.format(shared_memory_size), - '-v', "{}:/work/mxnet".format(mx_root), # mount mxnet root - '-v', "{}:/work/build".format(local_build_folder), # mount mxnet/build for storing build artifacts - '-v', "{}:/work/ccache".format(local_ccache_dir), - '-u', '{}:{}'.format(os.getuid(), os.getgid()), - '-e', 'CCACHE_MAXSIZE={}'.format(CCACHE_MAXSIZE), - '-e', 'CCACHE_TEMPDIR=/tmp/ccache', # temp dir should be local and not shared - '-e', "CCACHE_DIR=/work/ccache", # this path is inside the container as /work/ccache is mounted - '-e', "CCACHE_LOGFILE=/tmp/ccache.log", # a container-scoped log, useful for ccache verification. - tag] - runlist.extend(command) - cmd = '\\\n\t'.join(runlist) + docker_client = docker.from_env() + # Equivalent command + docker_cmd_list = [ + get_docker_binary(nvidia_runtime), + 'run', + '--rm', + '--shm-size={}'.format(shared_memory_size), + # mount mxnet root + '-v', "{}:/work/mxnet".format(mx_root), + # mount mxnet/build for storing build + '-v', "{}:/work/build".format(local_build_folder), + '-v', "{}:/work/ccache".format(local_ccache_dir), + '-u', '{}:{}'.format(os.getuid(), os.getgid()), + '-e', 'CCACHE_MAXSIZE={}'.format(environment['CCACHE_MAXSIZE']), + # temp dir should be local and not shared + '-e', 'CCACHE_TEMPDIR={}'.format(environment['CCACHE_TEMPDIR']), + # this path is inside the container as /work/ccache is mounted + '-e', "CCACHE_DIR={}".format(environment['CCACHE_DIR']), + # a container-scoped log, useful for ccache verification. + '-e', "CCACHE_LOGFILE={}".format(environment['CCACHE_LOGFILE']), + '-ti', + tag] + docker_cmd_list.extend(command) + docker_cmd = ' \\\n\t'.join(docker_cmd_list) + logging.info("Running %s in container %s", command, tag) + logging.info("Executing the equivalent of:\n%s\n", docker_cmd) + # return code of the command inside docker ret = 0 - if not dry_run and not interactive: - logging.info("Running %s in container %s", command, tag) - logging.info("Executing:\n%s\n", cmd) - ret = call(runlist) - - if not dry_run and interactive: - into_cmd = deepcopy(runlist) - # -ti can't be after the tag, as is interpreted as a command so hook it up after the -u argument - idx = into_cmd.index('-u') + 2 - into_cmd[idx:idx] = ['-ti'] - cmd = ' \\\n\t'.join(into_cmd) - logging.info("Executing:\n%s\n", cmd) - ret = call(into_cmd) - - if not dry_run and not interactive and ret != 0: - logging.error("Running of command in container failed (%s):\n%s\n", ret, cmd) - logging.error("You can get into the container by adding the -i option") - raise subprocess.CalledProcessError(ret, cmd) + if not dry_run: + ############################# + # + signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT, signal.SIGTERM}) + # noinspection PyShadowingNames + runtime = None + if nvidia_runtime: + # noinspection PyShadowingNames + # runc is default (docker info | grep -i runtime) + runtime = 'nvidia' + + container = docker_client.containers.run( + tag, + runtime=runtime, + detach=True, + command=command, + shm_size=shared_memory_size, + user='{}:{}'.format(os.getuid(), os.getgid()), + volumes={ + mx_root: + {'bind': '/work/mxnet', 'mode': 'rw'}, + local_build_folder: + {'bind': '/work/build', 'mode': 'rw'}, + local_ccache_dir: + {'bind': '/work/ccache', 'mode': 'rw'}, + }, + environment=environment) + logging.info("Started container: %s", trim_container_id(container.id)) + # Race condition: + # If the previous call is interrupted then it's possible that the container is not cleaned up + # We avoid by masking the signals temporarily + cleanup.add_container(container) + signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT, signal.SIGTERM}) + # + ############################# + + stream = container.logs(stream=True, stdout=True, stderr=True) + sys.stdout.flush() + for chunk in stream: + sys.stdout.buffer.write(chunk) + sys.stdout.buffer.flush() + sys.stdout.flush() + stream.close() + try: + logging.info("Waiting for status of container %s for %d s.", + trim_container_id(container.id), + container_wait_s) + wait_result = container.wait(timeout=container_wait_s) + logging.info("Container exit status: %s", wait_result) + ret = wait_result.get('StatusCode', 200) + except Exception as e: + logging.exception(e) + ret = 150 + + # Stop + try: + logging.info("Stopping container: %s", trim_container_id(container.id)) + container.stop() + except Exception as e: + logging.exception(e) + ret = 151 + # Remove + try: + logging.info("Removing container: %s", trim_container_id(container.id)) + container.remove() + except Exception as e: + logging.exception(e) + ret = 152 + cleanup.remove_container(container) + containers = docker_client.containers.list() + if containers: + logging.info("Other running containers: %s", [trim_container_id(x.id) for x in containers]) return ret @@ -210,12 +350,13 @@ def list_platforms() -> str: def load_docker_cache(tag, docker_registry) -> None: + """Imports tagged container from the given docker registry""" if docker_registry: + # noinspection PyBroadException try: import docker_cache logging.info('Docker cache download is enabled from registry %s', docker_registry) docker_cache.load_docker_cache(registry=docker_registry, docker_tag=tag) - # noinspection PyBroadException except Exception: logging.exception('Unable to retrieve Docker cache. Continue without...') else: @@ -231,6 +372,7 @@ def log_environment(): def script_name() -> str: + """:returns: script name with leading paths removed""" return os.path.split(sys.argv[0])[1] @@ -274,10 +416,6 @@ def main() -> int: help="print docker run command for manual inspection", action='store_true') - parser.add_argument("-i", "--interactive", - help="go in a shell inside the container", - action='store_true') - parser.add_argument("-d", "--docker-registry", help="Dockerhub registry name to retrieve cache from. Default is 'mxnetci'", default='mxnetci', @@ -299,7 +437,7 @@ def main() -> int: parser.add_argument("--ccache-dir", default=default_ccache_dir(), - help="Ccache directory", + help="ccache directory", type=str) args = parser.parse_args() @@ -310,6 +448,20 @@ def use_cache(): command = list(chain(*args.command)) docker_binary = get_docker_binary(args.nvidiadocker) + # Cleanup on signals and exit + cleanup = Cleanup() + + def signal_handler(signum, _): + signal.pthread_sigmask(signal.SIG_BLOCK, {signum}) + logging.warning("Signal %d received, cleaning up...", signum) + cleanup() + logging.warning("done. Exiting with error.") + sys.exit(1) + + atexit.register(cleanup) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + if args.list: print(list_platforms()) elif args.platform: @@ -323,38 +475,42 @@ def use_cache(): logging.warning("Container was just built. Exiting due to build-only.") return 0 + # noinspection PyUnusedLocal + ret = 0 if command: - container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, - command=command, docker_registry=args.docker_registry, - local_ccache_dir=args.ccache_dir, interactive=args.interactive) + ret = container_run( + platform=platform, nvidia_runtime=args.nvidiadocker, + shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry, + local_ccache_dir=args.ccache_dir, cleanup=cleanup) elif args.print_docker_run: - print(container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, - command=[], dry_run=True, docker_registry=args.docker_registry, - local_ccache_dir=args.ccache_dir)) - elif args.interactive: - container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, - command=command, docker_registry=args.docker_registry, - local_ccache_dir=args.ccache_dir, interactive=args.interactive) - + command = [] + ret = container_run( + platform=platform, nvidia_runtime=args.nvidiadocker, + shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry, + local_ccache_dir=args.ccache_dir, dry_run=True, cleanup=cleanup) else: # With no commands, execute a build function for the target platform - assert not args.interactive, "when running with -i must provide a command" - cmd = ["/work/mxnet/ci/docker/runtime_functions.sh", "build_{}".format(platform)] - logging.info("No command specified, trying default build: %s", ' '.join(cmd)) - container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, - command=cmd, docker_registry=args.docker_registry, - local_ccache_dir=args.ccache_dir) + command = ["/work/mxnet/ci/docker/runtime_functions.sh", "build_{}".format(platform)] + logging.info("No command specified, trying default build: %s", ' '.join(command)) + ret = container_run( + platform=platform, nvidia_runtime=args.nvidiadocker, + shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry, + local_ccache_dir=args.ccache_dir, cleanup=cleanup) + + if ret != 0: + logging.critical("Execution of %s failed with status: %d", command, ret) + return ret elif args.all: platforms = get_platforms() - logging.info("Building for all architectures: {}".format(platforms)) + logging.info("Building for all architectures: %s", platforms) logging.info("Artifacts will be produced in the build/ directory.") for platform in platforms: tag = get_docker_tag(platform=platform, registry=args.docker_registry) if use_cache(): load_docker_cache(tag=tag, docker_registry=args.docker_registry) - build_docker(platform, docker_binary, args.docker_registry, num_retries=args.docker_build_retries, - use_cache=use_cache()) + build_docker(platform, docker_binary=docker_binary, registry=args.docker_registry, + num_retries=args.docker_build_retries, use_cache=use_cache()) if args.build_only: continue shutil.rmtree(buildir(), ignore_errors=True) @@ -362,11 +518,13 @@ def use_cache(): plat_buildir = os.path.abspath(os.path.join(get_mxnet_root(), '..', "mxnet_{}".format(build_platform))) if os.path.exists(plat_buildir): - logging.warning("{} already exists, skipping".format(plat_buildir)) + logging.warning("%s already exists, skipping", plat_buildir) continue command = ["/work/mxnet/ci/docker/runtime_functions.sh", build_platform] - container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, - command=command, docker_registry=args.docker_registry, local_ccache_dir=args.ccache_dir) + container_run( + platform=platform, nvidia_runtime=args.nvidiadocker, + shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry, + local_ccache_dir=args.ccache_dir, cleanup=cleanup) shutil.move(buildir(), plat_buildir) logging.info("Built files left in: %s", plat_buildir) @@ -389,13 +547,9 @@ def use_cache(): Will print a docker run command to get inside the container in a shell -./build.py -p armv7 --interactive - - Will execute a shell into the container - ./build.py -a - Builds for all platforms and leaves artifacts in build_. + Builds for all platforms and leaves artifacts in build_ """)