diff --git a/ci/jenkins_tests/multi_node_tests/large_memory_test.py b/ci/jenkins_tests/miscellaneous/large_memory_test.py similarity index 91% rename from ci/jenkins_tests/multi_node_tests/large_memory_test.py rename to ci/jenkins_tests/miscellaneous/large_memory_test.py index 4ac34d2cfdf2..9421c15b06b6 100644 --- a/ci/jenkins_tests/multi_node_tests/large_memory_test.py +++ b/ci/jenkins_tests/miscellaneous/large_memory_test.py @@ -32,6 +32,9 @@ del c print("Successfully put C.") + # The below code runs successfully, but when commented in, the whole test + # takes about 10 minutes. + # D = (2 ** 30 + 1) * ["h"] # d = ray.put(D) # assert ray.get(d) == D diff --git a/ci/jenkins_tests/multi_node_tests/test_wait_hanging.py b/ci/jenkins_tests/miscellaneous/test_wait_hanging.py similarity index 100% rename from ci/jenkins_tests/multi_node_tests/test_wait_hanging.py rename to ci/jenkins_tests/miscellaneous/test_wait_hanging.py diff --git a/ci/jenkins_tests/multi_node_docker_test.py b/ci/jenkins_tests/multi_node_docker_test.py deleted file mode 100644 index 4b22dafc7418..000000000000 --- a/ci/jenkins_tests/multi_node_docker_test.py +++ /dev/null @@ -1,441 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import argparse -import datetime -import os -import random -import re -import signal -import subprocess -import sys - - -# This is duplicated from ray.utils so that we do not have to introduce a -# dependency on Ray to run this file. -def decode(byte_str): - """Make this unicode in Python 3, otherwise leave it as bytes.""" - if not isinstance(byte_str, bytes): - raise ValueError("The argument must be a bytes object.") - if sys.version_info >= (3, 0): - return byte_str.decode("ascii") - else: - return byte_str - - -def wait_for_output(proc): - """This is a convenience method to parse a process's stdout and stderr. - - Args: - proc: A process started by subprocess.Popen. - - Returns: - A tuple of the stdout and stderr of the process as strings. - """ - try: - # NOTE: This test must be run with Python 3. - stdout_data, stderr_data = proc.communicate(timeout=200) - except subprocess.TimeoutExpired: - # Timeout: kill the process. - # Get the remaining message from PIPE for debugging purpose. - print("Killing process because it timed out.") - proc.kill() - stdout_data, stderr_data = proc.communicate() - - if stdout_data is not None: - try: - # NOTE(rkn): This try/except block is here because I once saw an - # exception raised here and want to print more information if that - # happens again. - stdout_data = decode(stdout_data) - except UnicodeDecodeError: - raise Exception("Failed to decode stdout_data:", stdout_data) - - if stderr_data is not None: - try: - # NOTE(rkn): This try/except block is here because I once saw an - # exception raised here and want to print more information if that - # happens again. - stderr_data = decode(stderr_data) - except UnicodeDecodeError: - raise Exception("Failed to decode stderr_data:", stderr_data) - - return stdout_data, stderr_data - - -class DockerRunner(object): - """This class manages the logistics of running multiple nodes in Docker. - - This class is used for starting multiple Ray nodes within Docker, stopping - Ray, running a workload, and determining the success or failure of the - workload. - - Attributes: - head_container_id: The ID of the docker container that runs the head - node. - worker_container_ids: A list of the docker container IDs of the Ray - worker nodes. - head_container_ip: The IP address of the docker container that runs the - head node. - """ - - def __init__(self): - """Initialize the DockerRunner.""" - self.head_container_id = None - self.worker_container_ids = [] - self.head_container_ip = None - - def _get_container_id(self, stdout_data): - """Parse the docker container ID from stdout_data. - - Args: - stdout_data: This should be a string with the standard output of a - call to a docker command. - - Returns: - The container ID of the docker container. - """ - p = re.compile("([0-9a-f]{64})\n") - m = p.match(stdout_data) - if m is None: - return None - else: - return m.group(1) - - def _get_container_ip(self, container_id): - """Get the IP address of a specific docker container. - - Args: - container_id: The docker container ID of the relevant docker - container. - - Returns: - The IP address of the container. - """ - proc = subprocess.Popen( - [ - "docker", "inspect", - "--format={{.NetworkSettings.Networks.bridge" - ".IPAddress}}", container_id - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout_data, _ = wait_for_output(proc) - p = re.compile("([0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3})") - m = p.match(stdout_data) - if m is None: - raise RuntimeError("Container IP not found.") - else: - return m.group(1) - - def _start_head_node(self, docker_image, mem_size, shm_size, - num_redis_shards, num_cpus, num_gpus, - development_mode): - """Start the Ray head node inside a docker container.""" - mem_arg = ["--memory=" + mem_size] if mem_size else [] - shm_arg = ["--shm-size=" + shm_size] if shm_size else [] - volume_arg = ([ - "-v", "{}:{}".format( - os.path.dirname(os.path.realpath(__file__)), - "/ray/test/jenkins_tests") - ] if development_mode else []) - - command = (["docker", "run", "-d"] + mem_arg + shm_arg + volume_arg + [ - docker_image, "ray", "start", "--head", "--block", - "--redis-port=6379", - "--num-redis-shards={}".format(num_redis_shards), - "--num-cpus={}".format(num_cpus), "--num-gpus={}".format(num_gpus), - "--no-ui" - ]) - print("Starting head node with command:{}".format(command)) - - proc = subprocess.Popen( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout_data, _ = wait_for_output(proc) - container_id = self._get_container_id(stdout_data) - if container_id is None: - raise RuntimeError("Failed to find container ID.") - self.head_container_id = container_id - self.head_container_ip = self._get_container_ip(container_id) - - def _start_worker_node(self, docker_image, mem_size, shm_size, num_cpus, - num_gpus, development_mode): - """Start a Ray worker node inside a docker container.""" - mem_arg = ["--memory=" + mem_size] if mem_size else [] - shm_arg = ["--shm-size=" + shm_size] if shm_size else [] - volume_arg = ([ - "-v", "{}:{}".format( - os.path.dirname(os.path.realpath(__file__)), - "/ray/test/jenkins_tests") - ] if development_mode else []) - command = (["docker", "run", "-d"] + mem_arg + shm_arg + volume_arg + [ - "--shm-size=" + shm_size, docker_image, "ray", "start", "--block", - "--redis-address={:s}:6379".format(self.head_container_ip), - "--num-cpus={}".format(num_cpus), "--num-gpus={}".format(num_gpus) - ]) - print("Starting worker node with command:{}".format(command)) - proc = subprocess.Popen( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout_data, _ = wait_for_output(proc) - container_id = self._get_container_id(stdout_data) - if container_id is None: - raise RuntimeError("Failed to find container id") - self.worker_container_ids.append(container_id) - - def start_ray(self, - docker_image=None, - mem_size=None, - shm_size=None, - num_nodes=None, - num_redis_shards=1, - num_cpus=None, - num_gpus=None, - development_mode=None): - """Start a Ray cluster within docker. - - This starts one docker container running the head node and - num_nodes - 1 docker containers running the Ray worker nodes. - - Args: - docker_image: The docker image to use for all of the nodes. - mem_size: The amount of memory to start each docker container with. - This will be passed into `docker run` as the --memory flag. If - this is None, then no --memory flag will be used. - shm_size: The amount of shared memory to start each docker - container with. This will be passed into `docker run` as the - `--shm-size` flag. - num_nodes: The number of nodes to use in the cluster (this counts - the head node as well). - num_redis_shards: The number of Redis shards to use on the head - node. - num_cpus: A list of the number of CPUs to start each node with. - num_gpus: A list of the number of GPUs to start each node with. - development_mode: True if you want to mount the local copy of - test/jenkins_test on the head node so we can avoid rebuilding - docker images during development. - """ - assert len(num_cpus) == num_nodes - assert len(num_gpus) == num_nodes - - # Launch the head node. - self._start_head_node(docker_image, mem_size, shm_size, - num_redis_shards, num_cpus[0], num_gpus[0], - development_mode) - # Start the worker nodes. - for i in range(num_nodes - 1): - self._start_worker_node(docker_image, mem_size, shm_size, - num_cpus[1 + i], num_gpus[1 + i], - development_mode) - - def _stop_node(self, container_id): - """Stop a node in the Ray cluster.""" - proc = subprocess.Popen( - ["docker", "kill", container_id], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout_data, _ = wait_for_output(proc) - stopped_container_id = self._get_container_id(stdout_data) - if not container_id == stopped_container_id: - raise Exception("Failed to stop container {}." - .format(container_id)) - - proc = subprocess.Popen( - ["docker", "rm", "-f", container_id], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout_data, _ = wait_for_output(proc) - removed_container_id = self._get_container_id(stdout_data) - if not container_id == removed_container_id: - raise Exception("Failed to remove container {}." - .format(container_id)) - - print( - "stop_node", { - "container_id": container_id, - "is_head": container_id == self.head_container_id - }) - - def stop_ray(self): - """Stop the Ray cluster.""" - success = True - - try: - self._stop_node(self.head_container_id) - except Exception: - success = False - - for container_id in self.worker_container_ids: - try: - self._stop_node(container_id) - except Exception: - success = False - - return success - - def run_test(self, - test_script, - num_drivers, - driver_locations=None, - timeout_seconds=600): - """Run a test script. - - Run a test using the Ray cluster. - - Args: - test_script: The test script to run. - num_drivers: The number of copies of the test script to run. - driver_locations: A list of the indices of the containers that the - different copies of the test script should be run on. If this - is None, then the containers will be chosen randomly. - timeout_seconds: The amount of time in seconds to wait before - considering the test to have failed. When the timeout expires, - this will cause this function to raise an exception. - - Returns: - A dictionary with information about the test script run. - - Raises: - Exception: An exception is raised if the timeout expires. - """ - print("Multi-node docker test started at: {}".format( - datetime.datetime.now())) - all_container_ids = ( - [self.head_container_id] + self.worker_container_ids) - if driver_locations is None: - driver_locations = [ - random.randrange(0, len(all_container_ids)) - for i in range(num_drivers) - ] - print("driver_locations: {}".format(driver_locations)) - - # Define a signal handler and set an alarm to go off in - # timeout_seconds. - def handler(signum, frame): - raise RuntimeError("This test timed out after {} seconds." - .format(timeout_seconds)) - - signal.signal(signal.SIGALRM, handler) - signal.alarm(timeout_seconds) - - # Start the different drivers. - driver_processes = [] - for i in range(len(driver_locations)): - # Get the container ID to run the ith driver in. - container_id = all_container_ids[driver_locations[i]] - command = [ - "docker", "exec", container_id, "/bin/bash", "-c", - ("RAY_REDIS_ADDRESS={}:6379 RAY_DRIVER_INDEX={} " - "python {}".format(self.head_container_ip, i, test_script)) - ] - print("Starting driver with command {}.".format(test_script)) - # Start the driver. - p = subprocess.Popen( - command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - driver_processes.append(p) - - results = [] - for p in driver_processes: - stdout_data, stderr_data = wait_for_output(p) - print("STDOUT:") - print(stdout_data) - print("STDERR:") - print(stderr_data) - results.append({ - "success": p.returncode == 0, - "return_code": p.returncode - }) - - # Disable the alarm. - signal.alarm(0) - print("Multi-node docker test ended at: {}".format( - datetime.datetime.now())) - return results - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Run multinode tests in Docker.") - parser.add_argument( - "--docker-image", default="ray-project/deploy", help="docker image") - parser.add_argument("--mem-size", help="memory size") - parser.add_argument("--shm-size", default="1G", help="shared memory size") - parser.add_argument( - "--num-nodes", - default=1, - type=int, - help="number of nodes to use in the cluster") - parser.add_argument( - "--num-redis-shards", - default=1, - type=int, - help=("the number of Redis shards to start on the " - "head node")) - parser.add_argument( - "--num-cpus", - type=str, - help=("a comma separated list of values representing " - "the number of CPUs to start each node with")) - parser.add_argument( - "--num-gpus", - type=str, - help=("a comma separated list of values representing " - "the number of GPUs to start each node with")) - parser.add_argument( - "--num-drivers", default=1, type=int, help="number of drivers to run") - parser.add_argument( - "--driver-locations", - type=str, - help=("a comma separated list of indices of the " - "containers to run the drivers in")) - parser.add_argument("--test-script", required=True, help="test script") - parser.add_argument( - "--development-mode", - action="store_true", - help="use local copies of the test scripts") - args = parser.parse_args() - - # Parse the number of CPUs and GPUs to use for each worker. - num_nodes = args.num_nodes - num_cpus = ([int(i) for i in args.num_cpus.split(",")] - if args.num_cpus is not None else num_nodes * [10]) - num_gpus = ([int(i) for i in args.num_gpus.split(",")] - if args.num_gpus is not None else num_nodes * [0]) - - # Parse the driver locations. - driver_locations = (None if args.driver_locations is None else - [int(i) for i in args.driver_locations.split(",")]) - - d = DockerRunner() - d.start_ray( - docker_image=args.docker_image, - mem_size=args.mem_size, - shm_size=args.shm_size, - num_nodes=num_nodes, - num_redis_shards=args.num_redis_shards, - num_cpus=num_cpus, - num_gpus=num_gpus, - development_mode=args.development_mode) - try: - run_results = d.run_test( - args.test_script, - args.num_drivers, - driver_locations=driver_locations) - finally: - successfully_stopped = d.stop_ray() - - any_failed = False - for run_result in run_results: - if "success" in run_result and run_result["success"]: - print("RESULT: Test {} succeeded.".format(args.test_script)) - else: - print("RESULT: Test {} failed.".format(args.test_script)) - any_failed = True - - if any_failed: - sys.exit(1) - elif not successfully_stopped: - print("There was a failure when attempting to stop the containers.") - sys.exit(1) - else: - sys.exit(0) diff --git a/ci/jenkins_tests/multi_node_tests/many_drivers_test.py b/ci/jenkins_tests/multi_node_tests/many_drivers_test.py deleted file mode 100644 index 585c3806103a..000000000000 --- a/ci/jenkins_tests/multi_node_tests/many_drivers_test.py +++ /dev/null @@ -1,79 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import time - -import ray -from ray.tests.utils import (_wait_for_nodes_to_join, _broadcast_event, - _wait_for_event) - -# This test should be run with 5 nodes, which have 0, 0, 5, 6, and 50 GPUs for -# a total of 61 GPUs. It should be run with a large number of drivers (e.g., -# 100). At most 10 drivers will run at a time, and each driver will use at most -# 5 GPUs (this is ceil(61 / 15), which guarantees that we will always be able -# to make progress). -total_num_nodes = 5 -max_concurrent_drivers = 15 -num_gpus_per_driver = 5 - - -@ray.remote(num_cpus=0, num_gpus=1) -class Actor1(object): - def __init__(self): - assert len(ray.get_gpu_ids()) == 1 - - def check_ids(self): - assert len(ray.get_gpu_ids()) == 1 - - -def driver(redis_address, driver_index): - """The script for all drivers. - - This driver should create five actors that each use one GPU. After a while, - it should exit. - """ - ray.init(redis_address=redis_address) - - # Wait for all the nodes to join the cluster. - _wait_for_nodes_to_join(total_num_nodes) - - # Limit the number of drivers running concurrently. - for i in range(driver_index - max_concurrent_drivers + 1): - _wait_for_event("DRIVER_{}_DONE".format(i), redis_address) - - def try_to_create_actor(actor_class, timeout=500): - # Try to create an actor, but allow failures while we wait for the - # monitor to release the resources for the removed drivers. - start_time = time.time() - while time.time() - start_time < timeout: - try: - actor = actor_class.remote() - except Exception: - time.sleep(0.1) - else: - return actor - # If we are here, then we timed out while looping. - raise Exception("Timed out while trying to create actor.") - - # Create some actors that require one GPU. - actors_one_gpu = [] - for _ in range(num_gpus_per_driver): - actors_one_gpu.append(try_to_create_actor(Actor1)) - - for _ in range(100): - ray.get([actor.check_ids.remote() for actor in actors_one_gpu]) - - _broadcast_event("DRIVER_{}_DONE".format(driver_index), redis_address) - - -if __name__ == "__main__": - driver_index = int(os.environ["RAY_DRIVER_INDEX"]) - redis_address = os.environ["RAY_REDIS_ADDRESS"] - print("Driver {} started at {}.".format(driver_index, time.time())) - - # In this test, all drivers will run the same script. - driver(redis_address, driver_index) - - print("Driver {} finished at {}.".format(driver_index, time.time())) diff --git a/ci/jenkins_tests/multi_node_tests/remove_driver_test.py b/ci/jenkins_tests/multi_node_tests/remove_driver_test.py deleted file mode 100644 index 1cd10195b607..000000000000 --- a/ci/jenkins_tests/multi_node_tests/remove_driver_test.py +++ /dev/null @@ -1,274 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import time - -import ray -from ray.tests.utils import (_wait_for_nodes_to_join, _broadcast_event, - _wait_for_event, wait_for_pid_to_exit) - -# This test should be run with 5 nodes, which have 0, 1, 2, 3, and 4 GPUs for a -# total of 10 GPUs. It should be run with 7 drivers. Drivers 2 through 6 must -# run on different nodes so they can check if all the relevant workers on all -# the nodes have been killed. -total_num_nodes = 5 - - -def actor_event_name(driver_index, actor_index): - return "DRIVER_{}_ACTOR_{}_RUNNING".format(driver_index, actor_index) - - -def remote_function_event_name(driver_index, task_index): - return "DRIVER_{}_TASK_{}_RUNNING".format(driver_index, task_index) - - -@ray.remote -def long_running_task(driver_index, task_index, redis_address): - _broadcast_event( - remote_function_event_name(driver_index, task_index), - redis_address, - data=(ray.services.get_node_ip_address(), os.getpid())) - # Loop forever. - while True: - time.sleep(100) - - -num_long_running_tasks_per_driver = 2 - - -@ray.remote -class Actor0(object): - def __init__(self, driver_index, actor_index, redis_address): - _broadcast_event( - actor_event_name(driver_index, actor_index), - redis_address, - data=(ray.services.get_node_ip_address(), os.getpid())) - assert len(ray.get_gpu_ids()) == 0 - - def check_ids(self): - assert len(ray.get_gpu_ids()) == 0 - - def long_running_method(self): - # Loop forever. - while True: - time.sleep(100) - - -@ray.remote(num_gpus=1) -class Actor1(object): - def __init__(self, driver_index, actor_index, redis_address): - _broadcast_event( - actor_event_name(driver_index, actor_index), - redis_address, - data=(ray.services.get_node_ip_address(), os.getpid())) - assert len(ray.get_gpu_ids()) == 1 - - def check_ids(self): - assert len(ray.get_gpu_ids()) == 1 - - def long_running_method(self): - # Loop forever. - while True: - time.sleep(100) - - -@ray.remote(num_gpus=2) -class Actor2(object): - def __init__(self, driver_index, actor_index, redis_address): - _broadcast_event( - actor_event_name(driver_index, actor_index), - redis_address, - data=(ray.services.get_node_ip_address(), os.getpid())) - assert len(ray.get_gpu_ids()) == 2 - - def check_ids(self): - assert len(ray.get_gpu_ids()) == 2 - - def long_running_method(self): - # Loop forever. - while True: - time.sleep(100) - - -def driver_0(redis_address, driver_index): - """The script for driver 0. - - This driver should create five actors that each use one GPU and some actors - that use no GPUs. After a while, it should exit. - """ - ray.init(redis_address=redis_address) - - # Wait for all the nodes to join the cluster. - _wait_for_nodes_to_join(total_num_nodes) - - # Start some long running task. Driver 2 will make sure the worker running - # this task has been killed. - for i in range(num_long_running_tasks_per_driver): - long_running_task.remote(driver_index, i, redis_address) - - # Create some actors that require one GPU. - actors_one_gpu = [ - Actor1.remote(driver_index, i, redis_address) for i in range(5) - ] - # Create some actors that don't require any GPUs. - actors_no_gpus = [ - Actor0.remote(driver_index, 5 + i, redis_address) for i in range(5) - ] - - for _ in range(1000): - ray.get([actor.check_ids.remote() for actor in actors_one_gpu]) - ray.get([actor.check_ids.remote() for actor in actors_no_gpus]) - - # Start a long-running method on one actor and make sure this doesn't - # affect anything. - actors_no_gpus[0].long_running_method.remote() - - _broadcast_event("DRIVER_0_DONE", redis_address) - - -def driver_1(redis_address, driver_index): - """The script for driver 1. - - This driver should create one actor that uses two GPUs, three actors that - each use one GPU (the one requiring two must be created first), and some - actors that don't use any GPUs. After a while, it should exit. - """ - ray.init(redis_address=redis_address) - - # Wait for all the nodes to join the cluster. - _wait_for_nodes_to_join(total_num_nodes) - - # Start some long running task. Driver 2 will make sure the worker running - # this task has been killed. - for i in range(num_long_running_tasks_per_driver): - long_running_task.remote(driver_index, i, redis_address) - - # Create an actor that requires two GPUs. - actors_two_gpus = [ - Actor2.remote(driver_index, i, redis_address) for i in range(1) - ] - # Create some actors that require one GPU. - actors_one_gpu = [ - Actor1.remote(driver_index, 1 + i, redis_address) for i in range(3) - ] - # Create some actors that don't require any GPUs. - actors_no_gpus = [ - Actor0.remote(driver_index, 1 + 3 + i, redis_address) for i in range(5) - ] - - for _ in range(1000): - ray.get([actor.check_ids.remote() for actor in actors_two_gpus]) - ray.get([actor.check_ids.remote() for actor in actors_one_gpu]) - ray.get([actor.check_ids.remote() for actor in actors_no_gpus]) - - # Start a long-running method on one actor and make sure this doesn't - # affect anything. - actors_one_gpu[0].long_running_method.remote() - - _broadcast_event("DRIVER_1_DONE", redis_address) - - -def cleanup_driver(redis_address, driver_index): - """The script for drivers 2 through 6. - - This driver should wait for the first two drivers to finish. Then it should - create some actors that use a total of ten GPUs. - """ - ray.init(redis_address=redis_address) - - # Only one of the cleanup drivers should create more actors. - if driver_index == 2: - # We go ahead and create some actors that don't require any GPUs. We - # don't need to wait for the other drivers to finish. We call methods - # on these actors later to make sure they haven't been killed. - actors_no_gpus = [ - Actor0.remote(driver_index, i, redis_address) for i in range(10) - ] - - _wait_for_event("DRIVER_0_DONE", redis_address) - _wait_for_event("DRIVER_1_DONE", redis_address) - - def try_to_create_actor(actor_class, driver_index, actor_index, - timeout=20): - # Try to create an actor, but allow failures while we wait for the - # monitor to release the resources for the removed drivers. - start_time = time.time() - while time.time() - start_time < timeout: - try: - actor = actor_class.remote(driver_index, actor_index, - redis_address) - except Exception: - time.sleep(0.1) - else: - return actor - # If we are here, then we timed out while looping. - raise Exception("Timed out while trying to create actor.") - - # Only one of the cleanup drivers should create more actors. - if driver_index == 2: - # Create some actors that require one GPU. - actors_one_gpu = [] - for i in range(10): - actors_one_gpu.append( - try_to_create_actor(Actor1, driver_index, 10 + 3 + i)) - - removed_workers = 0 - - # Make sure that the PIDs for the long-running tasks from driver 0 and - # driver 1 have been killed. - for i in range(num_long_running_tasks_per_driver): - node_ip_address, pid = _wait_for_event( - remote_function_event_name(0, i), redis_address) - if node_ip_address == ray.services.get_node_ip_address(): - wait_for_pid_to_exit(pid) - removed_workers += 1 - for i in range(num_long_running_tasks_per_driver): - node_ip_address, pid = _wait_for_event( - remote_function_event_name(1, i), redis_address) - if node_ip_address == ray.services.get_node_ip_address(): - wait_for_pid_to_exit(pid) - removed_workers += 1 - # Make sure that the PIDs for the actors from driver 0 and driver 1 have - # been killed. - for i in range(10): - node_ip_address, pid = _wait_for_event( - actor_event_name(0, i), redis_address) - if node_ip_address == ray.services.get_node_ip_address(): - wait_for_pid_to_exit(pid) - removed_workers += 1 - for i in range(9): - node_ip_address, pid = _wait_for_event( - actor_event_name(1, i), redis_address) - if node_ip_address == ray.services.get_node_ip_address(): - wait_for_pid_to_exit(pid) - removed_workers += 1 - - print("{} workers/actors were removed on this node." - .format(removed_workers)) - - # Only one of the cleanup drivers should create and use more actors. - if driver_index == 2: - for _ in range(1000): - ray.get([actor.check_ids.remote() for actor in actors_one_gpu]) - ray.get([actor.check_ids.remote() for actor in actors_no_gpus]) - - _broadcast_event("DRIVER_{}_DONE".format(driver_index), redis_address) - - -if __name__ == "__main__": - driver_index = int(os.environ["RAY_DRIVER_INDEX"]) - redis_address = os.environ["RAY_REDIS_ADDRESS"] - print("Driver {} started at {}.".format(driver_index, time.time())) - - if driver_index == 0: - driver_0(redis_address, driver_index) - elif driver_index == 1: - driver_1(redis_address, driver_index) - elif driver_index in [2, 3, 4, 5, 6]: - cleanup_driver(redis_address, driver_index) - else: - raise Exception("This code should be unreachable.") - - print("Driver {} finished at {}.".format(driver_index, time.time())) diff --git a/ci/jenkins_tests/multi_node_tests/test_0.py b/ci/jenkins_tests/multi_node_tests/test_0.py deleted file mode 100644 index 7d8240568ba6..000000000000 --- a/ci/jenkins_tests/multi_node_tests/test_0.py +++ /dev/null @@ -1,36 +0,0 @@ -from __future__ import absolute_import -from __future__ import division -from __future__ import print_function - -import os -import time - -import ray - - -@ray.remote -def f(): - time.sleep(0.1) - return ray.services.get_node_ip_address() - - -if __name__ == "__main__": - driver_index = int(os.environ["RAY_DRIVER_INDEX"]) - redis_address = os.environ["RAY_REDIS_ADDRESS"] - print("Driver {} started at {}.".format(driver_index, time.time())) - - ray.init(redis_address=redis_address) - # Check that tasks are scheduled on all nodes. - num_attempts = 30 - for i in range(num_attempts): - ip_addresses = ray.get([f.remote() for i in range(1000)]) - distinct_addresses = set(ip_addresses) - counts = [ - ip_addresses.count(address) for address in distinct_addresses - ] - print("Counts are {}".format(counts)) - if len(counts) == 5: - break - assert len(counts) == 5 - - print("Driver {} finished at {}.".format(driver_index, time.time())) diff --git a/ci/jenkins_tests/run_multi_node_tests.sh b/ci/jenkins_tests/run_multi_node_tests.sh index 263c2343d9fe..28f805984673 100755 --- a/ci/jenkins_tests/run_multi_node_tests.sh +++ b/ci/jenkins_tests/run_multi_node_tests.sh @@ -51,32 +51,5 @@ $SUPPRESS_OUTPUT docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} ######################## RAY BACKEND TESTS ################################# -$SUPPRESS_OUTPUT python3 $ROOT_DIR/multi_node_docker_test.py \ - --docker-image=$DOCKER_SHA \ - --num-nodes=5 \ - --num-redis-shards=10 \ - --test-script=/ray/ci/jenkins_tests/multi_node_tests/test_0.py - -$SUPPRESS_OUTPUT python3 $ROOT_DIR/multi_node_docker_test.py \ - --docker-image=$DOCKER_SHA \ - --num-nodes=5 \ - --num-redis-shards=5 \ - --num-gpus=0,1,2,3,4 \ - --num-drivers=7 \ - --driver-locations=0,1,0,1,2,3,4 \ - --test-script=/ray/ci/jenkins_tests/multi_node_tests/remove_driver_test.py - -$SUPPRESS_OUTPUT python3 $ROOT_DIR/multi_node_docker_test.py \ - --docker-image=$DOCKER_SHA \ - --num-nodes=5 \ - --num-redis-shards=2 \ - --num-gpus=0,0,5,6,50 \ - --num-drivers=100 \ - --test-script=/ray/ci/jenkins_tests/multi_node_tests/many_drivers_test.py - -$SUPPRESS_OUTPUT python3 $ROOT_DIR/multi_node_docker_test.py \ - --docker-image=$DOCKER_SHA \ - --num-nodes=1 \ - --mem-size=60G \ - --shm-size=60G \ - --test-script=/ray/ci/jenkins_tests/multi_node_tests/large_memory_test.py +$SUPPRESS_OUTPUT docker run --rm --shm-size=60G --memory=60G $DOCKER_SHA \ + python /ray/ci/jenkins_tests/miscellaneous/large_memory_test.py diff --git a/ci/long_running_tests/workloads/many_drivers.py b/ci/long_running_tests/workloads/many_drivers.py new file mode 100644 index 000000000000..b3fb4d1ee296 --- /dev/null +++ b/ci/long_running_tests/workloads/many_drivers.py @@ -0,0 +1,105 @@ +# This workload tests many drivers using the same cluster. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import time + +import ray +from ray.tests.cluster_utils import Cluster +from ray.tests.utils import run_string_as_driver + +num_redis_shards = 5 +redis_max_memory = 10**8 +object_store_memory = 10**8 +num_nodes = 4 + +message = ("Make sure there is enough memory on this machine to run this " + "workload. We divide the system memory by 2 to provide a buffer.") +assert (num_nodes * object_store_memory + num_redis_shards * redis_max_memory < + ray.utils.get_system_memory() / 2) + +# Simulate a cluster on one machine. + +cluster = Cluster() +for i in range(num_nodes): + cluster.add_node( + redis_port=6379 if i == 0 else None, + num_redis_shards=num_redis_shards if i == 0 else None, + num_cpus=4, + num_gpus=0, + resources={str(i): 5}, + object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory) +ray.init(redis_address=cluster.redis_address) + +# Run the workload. + +# Define a driver script that runs a few tasks and actors on each node in the +# cluster. +driver_script = """ +import ray + +ray.init(redis_address="{}") + +num_nodes = {} + + +@ray.remote +def f(): + return 1 + + +@ray.remote +class Actor(object): + def method(self): + return 1 + + +for _ in range(5): + for i in range(num_nodes): + assert (ray.get( + f._remote(args=[], kwargs={{}}, resources={{str(i): 1}})) == 1) + actor = Actor._remote(args=[], kwargs={{}}, resources={{str(i): 1}}) + assert ray.get(actor.method.remote()) == 1 + +print("success") +""".format(cluster.redis_address, num_nodes) + + +@ray.remote +def run_driver(): + output = run_string_as_driver(driver_script) + assert "success" in output + + +iteration = 0 +running_ids = [ + run_driver._remote( + args=[], kwargs={}, num_cpus=0, resources={str(i): 0.01}) + for i in range(num_nodes) +] +start_time = time.time() +previous_time = start_time +while True: + # Wait for a driver to finish and start a new driver. + [ready_id], running_ids = ray.wait(running_ids, num_returns=1) + ray.get(ready_id) + + running_ids.append( + run_driver._remote( + args=[], + kwargs={}, + num_cpus=0, + resources={str(iteration % num_nodes): 0.01})) + + new_time = time.time() + print("Iteration {}:\n" + " - Iteration time: {}.\n" + " - Absolute time: {}.\n" + " - Total elapsed time: {}.".format( + iteration, new_time - previous_time, new_time, + new_time - start_time)) + previous_time = new_time + iteration += 1 diff --git a/python/ray/tests/utils.py b/python/ray/tests/utils.py index 189f9ae35f58..e4249f89a9cb 100644 --- a/python/ray/tests/utils.py +++ b/python/ray/tests/utils.py @@ -2,9 +2,7 @@ from __future__ import division from __future__ import print_function -import json import os -import redis import subprocess import sys import tempfile @@ -12,92 +10,6 @@ import ray -EVENT_KEY = "RAY_MULTI_NODE_TEST_KEY" -"""This key is used internally within this file for coordinating drivers.""" - - -def _wait_for_nodes_to_join(num_nodes, timeout=20): - """Wait until the nodes have joined the cluster. - - This will wait until exactly num_nodes have joined the cluster. - - Args: - num_nodes: The number of nodes to wait for. - timeout: The amount of time in seconds to wait before failing. - - Raises: - Exception: An exception is raised if too many nodes join the cluster or - if the timeout expires while we are waiting. - """ - start_time = time.time() - while time.time() - start_time < timeout: - client_table = ray.global_state.client_table() - num_ready_nodes = len(client_table) - if num_ready_nodes == num_nodes: - return - if num_ready_nodes > num_nodes: - # Too many nodes have joined. Something must be wrong. - raise Exception("{} nodes have joined the cluster, but we were " - "expecting {} nodes.".format( - num_ready_nodes, num_nodes)) - time.sleep(0.1) - - # If we get here then we timed out. - raise Exception("Timed out while waiting for {} nodes to join. Only {} " - "nodes have joined so far.".format(num_ready_nodes, - num_nodes)) - - -def _broadcast_event(event_name, redis_address, data=None): - """Broadcast an event. - - This is used to synchronize drivers for the multi-node tests. - - Args: - event_name: The name of the event to wait for. - redis_address: The address of the Redis server to use for - synchronization. - data: Extra data to include in the broadcast (this will be returned by - the corresponding _wait_for_event call). This data must be json - serializable. - """ - redis_host, redis_port = redis_address.split(":") - redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) - payload = json.dumps((event_name, data)) - redis_client.rpush(EVENT_KEY, payload) - - -def _wait_for_event(event_name, redis_address, extra_buffer=0): - """Block until an event has been broadcast. - - This is used to synchronize drivers for the multi-node tests. - - Args: - event_name: The name of the event to wait for. - redis_address: The address of the Redis server to use for - synchronization. - extra_buffer: An amount of time in seconds to wait after the event. - - Returns: - The data that was passed into the corresponding _broadcast_event call. - """ - redis_host, redis_port = redis_address.split(":") - redis_client = redis.StrictRedis(host=redis_host, port=int(redis_port)) - while True: - event_infos = redis_client.lrange(EVENT_KEY, 0, -1) - events = {} - for event_info in event_infos: - name, data = json.loads(event_info) - if name in events: - raise Exception("The same event {} was broadcast twice." - .format(name)) - events[name] = data - if event_name in events: - # Potentially sleep a little longer and then return the event data. - time.sleep(extra_buffer) - return events[event_name] - time.sleep(0.1) - def _pid_alive(pid): """Check if the process with this PID is alive or not.