From ce02568f6ec81f7f5ab4867c36747d27fca60235 Mon Sep 17 00:00:00 2001 From: Pedro Larroy Date: Tue, 21 Aug 2018 18:33:02 +0200 Subject: [PATCH] A solution to prevent zombie containers locally and in CI --- ci/build.py | 283 ++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 232 insertions(+), 51 deletions(-) diff --git a/ci/build.py b/ci/build.py index f1a5e99e2d0e..169c9d57321a 100755 --- a/ci/build.py +++ b/ci/build.py @@ -23,16 +23,18 @@ """ __author__ = 'Marco de Abreu, Kellen Sunderland, Anton Chernov, Pedro Larroy' -__version__ = '0.2' +__version__ = '0.3' import argparse import glob import logging +import os import re import shutil import subprocess import sys import tempfile +import platform from copy import deepcopy from itertools import chain from subprocess import call, check_call, check_output @@ -40,12 +42,92 @@ from util import * import pprint import requests - - -CCACHE_MAXSIZE = '500G' - - -def get_dockerfiles_path(): +import docker +import docker.models +import docker.errors +import signal +import atexit + + +class Cleanup: + """A class to cleanup containers""" + def __init__(self): + self.containers = set() + self.docker_stop_timeout = 3 + + def add_container(self, container: docker.models.containers.Container): + assert isinstance(container, docker.models.containers.Container) + self.containers.add(container) + + def remove_container(self, container: docker.models.containers.Container): + assert isinstance(container, docker.models.containers.Container) + self.containers.remove(container) + + def _cleanup_containers(self): + if self.containers: + logging.warning("Cleaning up containers") + else: + return + try: + stop_timeout = int(os.environ.get("DOCKER_STOP_TIMEOUT", self.docker_stop_timeout)) + except Exception: + stop_timeout = 3 + for container in self.containers: + try: + container.stop(timeout=stop_timeout) + logging.info("☠: stopped container %s", trim_container_id(container.id)) + container.remove() + logging.info("🚽: removed container %s", trim_container_id(container.id)) + except Exception as e: + logging.exception(e) + #pass + self.containers.clear() + logging.info("Cleaning up containers finished.") + + def __call__(self): + """Perform cleanup""" + self._cleanup_containers() + + + +def retry(ExceptionToCheck, tries=4, delay_s=1, backoff=2): + """Retry calling the decorated function using an exponential backoff. + + http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/ + original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry + + :param ExceptionToCheck: the exception to check. may be a tuple of + exceptions to check + :type ExceptionToCheck: Exception or tuple + :param tries: number of times to try (not retry) before giving up + :type tries: int + :param delay_s: initial delay between retries in seconds + :type delay_s: int + :param backoff: backoff multiplier e.g. value of 2 will double the delay + each retry + :type backoff: int + """ + import time + from functools import wraps + def decorated_retry(f): + @wraps(f) + def f_retry(*args, **kwargs): + mtries, mdelay = tries, delay_s + while mtries > 1: + try: + return f(*args, **kwargs) + except ExceptionToCheck as e: + logging.warning("Exception: %s, Retrying in %d seconds...", str(e), mdelay) + time.sleep(mdelay) + mtries -= 1 + mdelay *= backoff + return f(*args, **kwargs) + + return f_retry # true decorator + + return decorated_retry + +def get_dockerfiles_path() -> str: return "docker" @@ -146,22 +228,42 @@ def default_ccache_dir() -> str: except PermissionError: logging.info('Unable to make dirs at %s, falling back to local temp dir', ccache_dir) # In osx tmpdir is not mountable by default - import platform if platform.system() == 'Darwin': ccache_dir = "/tmp/_mxnet_ccache" os.makedirs(ccache_dir, exist_ok=True) return ccache_dir return os.path.join(tempfile.gettempdir(), "ci_ccache") +def trim_container_id(cid): + return cid[:12] def container_run(platform: str, - docker_binary: str, + nvidia_runtime: bool, docker_registry: str, shared_memory_size: str, - local_ccache_dir: str, command: List[str], + local_ccache_dir: str, + cleanup: Cleanup, dry_run: bool = False, interactive: bool = False) -> int: + CONTAINER_WAIT_S = 600 + # + # Environment setup + # + environment = { + 'CCACHE_MAXSIZE': '500G', + 'CCACHE_TEMPDIR': '/tmp/ccache', # temp dir should be local and not shared + 'CCACHE_DIR': '/work/ccache', # this path is inside the container as /work/ccache is mounted + 'CCACHE_LOGFILE': '/tmp/ccache.log', # a container-scoped log, useful for ccache verification. + } + # These variables are passed to the container to the process tree killer can find runaway process inside the container + # https://wiki.jenkins.io/display/JENKINS/ProcessTreeKiller + # https://github.com/jenkinsci/jenkins/blob/578d6bacb33a5e99f149de504c80275796f0b231/core/src/main/java/hudson/model/Run.java#L2393 + # + JENKINS_ENV_VARS = ['BUILD_NUMBER', 'BUILD_ID', 'BUILD_TAG'] + environment.update({k: os.environ[k] for k in JENKINS_ENV_VARS if k in os.environ}) + environment.update({k: os.environ[k] for k in ['CCACHE_MAXSIZE'] if k in os.environ}) + tag = get_docker_tag(platform=platform, registry=docker_registry) mx_root = get_mxnet_root() local_build_folder = buildir() @@ -169,39 +271,99 @@ def container_run(platform: str, os.makedirs(local_build_folder, exist_ok=True) os.makedirs(local_ccache_dir, exist_ok=True) logging.info("Using ccache directory: %s", local_ccache_dir) - runlist = [docker_binary, 'run', '--rm', '-t', + docker_client = docker.from_env() + # Equivalent command + docker_cmd_list = [get_docker_binary(nvidia_runtime), 'run', + '--rm', '--shm-size={}'.format(shared_memory_size), '-v', "{}:/work/mxnet".format(mx_root), # mount mxnet root '-v', "{}:/work/build".format(local_build_folder), # mount mxnet/build for storing build artifacts '-v', "{}:/work/ccache".format(local_ccache_dir), '-u', '{}:{}'.format(os.getuid(), os.getgid()), - '-e', 'CCACHE_MAXSIZE={}'.format(CCACHE_MAXSIZE), - '-e', 'CCACHE_TEMPDIR=/tmp/ccache', # temp dir should be local and not shared - '-e', "CCACHE_DIR=/work/ccache", # this path is inside the container as /work/ccache is mounted - '-e', "CCACHE_LOGFILE=/tmp/ccache.log", # a container-scoped log, useful for ccache verification. + '-e', 'CCACHE_MAXSIZE={}'.format(environment['CCACHE_MAXSIZE']), + '-e', 'CCACHE_TEMPDIR={}'.format(environment['CCACHE_TEMPDIR']), # temp dir should be local and not shared + '-e', "CCACHE_DIR={}".format(environment['CCACHE_DIR']), # this path is inside the container as /work/ccache is mounted + '-e', "CCACHE_LOGFILE={}".format(environment['CCACHE_LOGFILE']), # a container-scoped log, useful for ccache verification. + '-ti', tag] - runlist.extend(command) - cmd = '\\\n\t'.join(runlist) - ret = 0 - if not dry_run and not interactive: - logging.info("Running %s in container %s", command, tag) - logging.info("Executing:\n%s\n", cmd) - ret = call(runlist) - - if not dry_run and interactive: - into_cmd = deepcopy(runlist) - # -ti can't be after the tag, as is interpreted as a command so hook it up after the -u argument - idx = into_cmd.index('-u') + 2 - into_cmd[idx:idx] = ['-ti'] - cmd = ' \\\n\t'.join(into_cmd) - logging.info("Executing:\n%s\n", cmd) - ret = call(into_cmd) - - if not dry_run and not interactive and ret != 0: - logging.error("Running of command in container failed (%s):\n%s\n", ret, cmd) - logging.error("You can get into the container by adding the -i option") - raise subprocess.CalledProcessError(ret, cmd) + docker_cmd_list.extend(command) + docker_cmd = ' \\\n\t'.join(docker_cmd_list) + logging.info("Running %s in container %s", command, tag) + logging.info("Executing the equivalent of:\n%s\n", docker_cmd) + ret = 0 # return code of the command inside docker + if not dry_run: + + + ############################# + # + signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT, signal.SIGTERM}) + if nvidia_runtime: + runtime='nvidia' + else: + # runc is default (docker info | grep -i runtime) + runtime=None + + container = docker_client.containers.run( + tag, + runtime=runtime, + detach=True, + command=command, + #auto_remove=True, + shm_size=shared_memory_size, + user='{}:{}'.format(os.getuid(), os.getgid()), + volumes={ + mx_root: + {'bind': '/work/mxnet', 'mode': 'rw'}, + local_build_folder: + {'bind': '/work/build', 'mode': 'rw'}, + local_ccache_dir: + {'bind': '/work/ccache', 'mode': 'rw'}, + }, + environment=environment) + logging.info("Started container: %s", trim_container_id(container.id)) + # Race condition: + # If the previous call is interrupted then it's possible that the container is not cleaned up + # We avoid by masking the signals temporarily + cleanup.add_container(container) + signal.pthread_sigmask(signal.SIG_UNBLOCK, {signal.SIGINT, signal.SIGTERM}) + # + ############################# + + stream = container.logs(stream=True, stdout=True, stderr=True) + sys.stdout.flush() + for chunk in stream: + sys.stdout.buffer.write(chunk) + sys.stdout.buffer.flush() + sys.stdout.flush() + stream.close() + try: + logging.info("Waiting for status of container %s for %d s.", trim_container_id(container.id), CONTAINER_WAIT_S) + wait_result = container.wait(timeout=CONTAINER_WAIT_S) + logging.info("Container exit status: %s", wait_result) + ret = wait_result.get('StatusCode', 200) + except Exception as e: + logging.exception(e) + ret = 150 + + # Stop + try: + logging.info("Stopping container: %s", trim_container_id(container.id)) + container.stop() + except Exception as e: + logging.exception(e) + ret = 151 + # Remove + try: + logging.info("Removing container: %s", trim_container_id(container.id)) + container.remove() + except Exception as e: + logging.exception(e) + ret = 152 + cleanup.remove_container(container) + containers = docker_client.containers.list() + if containers: + logging.info("Other running containers: %s", [trim_container_id(x.id) for x in containers]) return ret @@ -310,6 +472,19 @@ def use_cache(): command = list(chain(*args.command)) docker_binary = get_docker_binary(args.nvidiadocker) + # Cleanup on signals and exit + cleanup = Cleanup() + def signal_handler(signum, _): + signal.pthread_sigmask(signal.SIG_BLOCK, {signum}) + logging.warning("Signal %d received, cleaning up...", signum) + cleanup() + logging.warning("done. Exiting with error.") + sys.exit(1) + + atexit.register(cleanup) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGINT, signal_handler) + if args.list: print(list_platforms()) elif args.platform: @@ -323,27 +498,32 @@ def use_cache(): logging.warning("Container was just built. Exiting due to build-only.") return 0 + ret = 0 if command: - container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, - command=command, docker_registry=args.docker_registry, - local_ccache_dir=args.ccache_dir, interactive=args.interactive) + ret = container_run(platform=platform, nvidia_runtime=args.nvidiadocker, + shared_memory_size=args.shared_memory_size, command=command, docker_registry=args.docker_registry, + local_ccache_dir=args.ccache_dir, interactive=args.interactive, cleanup=cleanup) elif args.print_docker_run: - print(container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, - command=[], dry_run=True, docker_registry=args.docker_registry, - local_ccache_dir=args.ccache_dir)) + ret = container_run(platform=platform, nvidia_runtime=args.nvidiadocker, + shared_memory_size=args.shared_memory_size, command=[], dry_run=True, docker_registry=args.docker_registry, + local_ccache_dir=args.ccache_dir) + command=[] elif args.interactive: - container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, + ret = container_run(platform=platform, nvidia_runtime=args.nvidiadocker, shared_memory_size=shared_memory_size, command=command, docker_registry=args.docker_registry, - local_ccache_dir=args.ccache_dir, interactive=args.interactive) - + local_ccache_dir=args.ccache_dir, interactive=args.interactive, cleanup=cleanup) else: # With no commands, execute a build function for the target platform assert not args.interactive, "when running with -i must provide a command" - cmd = ["/work/mxnet/ci/docker/runtime_functions.sh", "build_{}".format(platform)] - logging.info("No command specified, trying default build: %s", ' '.join(cmd)) - container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, - command=cmd, docker_registry=args.docker_registry, - local_ccache_dir=args.ccache_dir) + + command = ["/work/mxnet/ci/docker/runtime_functions.sh", "build_{}".format(platform)] + logging.info("No command specified, trying default build: %s", ' '.join(command)) + ret = container_run(platform=platform, nvidia_runtime=args.nvidiadocker, shared_memory_size=args.shared_memory_size, + command=command, docker_registry=args.docker_registry, local_ccache_dir=args.ccache_dir, cleanup=cleanup) + + if ret != 0: + logging.critical("Execution of %s failed with status: %d", command, ret) + return(ret) elif args.all: platforms = get_platforms() @@ -365,7 +545,8 @@ def use_cache(): logging.warning("{} already exists, skipping".format(plat_buildir)) continue command = ["/work/mxnet/ci/docker/runtime_functions.sh", build_platform] - container_run(platform=platform, docker_binary=docker_binary, shared_memory_size=args.shared_memory_size, + # BUILD in docker + container_run(platform=platform, nvidia_runtime=args.nvidiadocker, shared_memory_size=shared_memory_size, command=command, docker_registry=args.docker_registry, local_ccache_dir=args.ccache_dir) shutil.move(buildir(), plat_buildir) logging.info("Built files left in: %s", plat_buildir)