diff --git a/doc/source/redis-memory-management.rst b/doc/source/redis-memory-management.rst index 91b207db54e1..5e6edcc02f6c 100644 --- a/doc/source/redis-memory-management.rst +++ b/doc/source/redis-memory-management.rst @@ -7,92 +7,9 @@ servers, as described in `An Overview of the Internals task/object generation rate could risk high memory pressure, potentially leading to out-of-memory (OOM) errors. -Here, we describe an experimental feature that transparently flushes metadata -entries out of Redis memory. +In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object +metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the +previously documented flushing functionality. -Requirements ------------- - -As of early July 2018, the automatic memory management feature requires building -Ray from source. We are planning on eliminating this step in the near future by -releasing official wheels. - -Building Ray -~~~~~~~~~~~~ - -First, follow `instructions to build Ray from source -`__ to install prerequisites. After -the prerequisites are installed, instead of doing the regular ``pip install`` as -referenced in that document, pass an additional special flag, -``RAY_USE_NEW_GCS=on``: - -.. code-block:: bash - - git clone https://github.com/ray-project/ray.git - cd ray/python - RAY_USE_NEW_GCS=on pip install -e . --verbose # Add --user if you see a permission denied error. - -Running Ray applications -~~~~~~~~~~~~~~~~~~~~~~~~ - -At run time the environment variables ``RAY_USE_NEW_GCS=on`` and -``RAY_USE_XRAY=1`` are required. - -.. code-block:: bash - - export RAY_USE_NEW_GCS=on - export RAY_USE_XRAY=1 - python my_ray_script.py # Or launch python/ipython. - -Activate memory flushing ------------------------- - -After building Ray using the method above, simply add these two lines after -``ray.init()`` to activate automatic memory flushing: - -.. code-block:: python - - ray.init(...) - - policy = ray.experimental.SimpleGcsFlushPolicy() - ray.experimental.set_flushing_policy(policy) - - # My awesome Ray application logic follows. - -Paramaters of the flushing policy ---------------------------------- - -There are three `user-configurable parameters -`_ -of the ``SimpleGcsFlushPolicy``: - -* ``flush_when_at_least_bytes``: Wait until this many bytes of memory usage - accumulated in the redis server before flushing kicks in. -* ``flush_period_secs``: Issue a flush to the Redis server every this many - seconds. -* ``flush_num_entries_each_time``: A hint to the system on the number of entries - to flush on each request. - -The default values should serve to be non-invasive for lightweight Ray -applications. ``flush_when_at_least_bytes`` is set to ``(1<<31)`` or 2GB, -``flush_period_secs`` to 10, and ``flush_num_entries_each_time`` to 10000: - -.. code-block:: python - - # Default parameters. - ray.experimental.SimpleGcsFlushPolicy( - flush_when_at_least_bytes=(1 << 31), - flush_period_secs=10, - flush_num_entries_each_time=10000) - -In particular, these default values imply that - -1. the Redis server would accumulate memory usage up to 2GB without any entries -being flushed, then the flushing would kick in; and - -2. generally, "older" metadata entries would be flushed first, and the Redis -server would always keep the most recent window of metadata of 2GB in size. - -**For advanced users.** Advanced users can tune the above parameters to their -applications' needs; note that the desired flush rate is equal to (flush -period) * (num entries each flush). +Note that profiling is disabled when ``redis_max_memory`` is set. This is because +profiling data cannot be LRU evicted. diff --git a/python/ray/memory_monitor.py b/python/ray/memory_monitor.py index 00cf86816dbf..a52f98d7077d 100644 --- a/python/ray/memory_monitor.py +++ b/python/ray/memory_monitor.py @@ -37,7 +37,8 @@ def get_message(used_gb, total_gb, threshold): round(psutil.virtual_memory().shared / 1e9, 2)) + "currently being used by the Ray object store. You can set " "the object store size with the `object_store_memory` " - "parameter when starting Ray.") + "parameter when starting Ray, and the max Redis size with " + "`redis_max_memory`.") class MemoryMonitor(object): diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 42b02f8926be..8cdd8296ed61 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -128,6 +128,19 @@ def add_event(self, event): self.events.append(event) +class NoopProfiler(object): + """A no-op profile used when collect_profile_data=False.""" + + def start_flush_thread(self): + pass + + def flush_profile_data(self): + pass + + def add_event(self, event): + pass + + class RayLogSpanRaylet(object): """An object used to enable logging a span of events with a with statement. diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index a1d8e13a1f57..72d6fc0b58c3 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -38,30 +38,33 @@ def create_parser(parser_creator=None): "--redis-address", default=None, type=str, - help="The Redis address of the cluster.") + help="Connect to an existing Ray cluster at this address instead " + "of starting a new one.") parser.add_argument( "--ray-num-cpus", default=None, type=int, - help="--num-cpus to pass to Ray." - " This only has an affect in local mode.") + help="--num-cpus to use if starting a new cluster.") parser.add_argument( "--ray-num-gpus", default=None, type=int, - help="--num-gpus to pass to Ray." - " This only has an affect in local mode.") + help="--num-gpus to use if starting a new cluster.") parser.add_argument( "--ray-num-local-schedulers", default=None, type=int, help="Emulate multiple cluster nodes for debugging.") + parser.add_argument( + "--ray-redis-max-memory", + default=None, + type=int, + help="--redis-max-memory to use if starting a new cluster.") parser.add_argument( "--ray-object-store-memory", default=None, type=int, - help="--object-store-memory to pass to Ray." - " This only has an affect in local mode.") + help="--object-store-memory to use if starting a new cluster.") parser.add_argument( "--experiment-name", default="default", @@ -122,12 +125,14 @@ def run(args, parser): "num_cpus": args.ray_num_cpus or 1, "num_gpus": args.ray_num_gpus or 0, }, - object_store_memory=args.ray_object_store_memory) + object_store_memory=args.ray_object_store_memory, + redis_max_memory=args.ray_redis_max_memory) ray.init(redis_address=cluster.redis_address) else: ray.init( redis_address=args.redis_address, object_store_memory=args.ray_object_store_memory, + redis_max_memory=args.ray_redis_max_memory, num_cpus=args.ray_num_cpus, num_gpus=args.ray_num_gpus) run_experiments( diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 60b05e4226d2..b84db6757c86 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -116,6 +116,22 @@ def cli(logging_level, logging_format): type=int, help="the maximum amount of memory (in bytes) to allow the " "object store to use") +@click.option( + "--redis-max-memory", + required=False, + type=int, + help=("The max amount of memory (in bytes) to allow redis to use, or None " + "for no limit. Once the limit is exceeded, redis will start LRU " + "eviction of entries. This only applies to the sharded " + "redis tables (task and object tables).")) +@click.option( + "--collect-profiling-data", + default=True, + type=bool, + help=("Whether to collect profiling data. Note that " + "profiling data cannot be LRU evicted, so if you set " + "redis_max_memory then profiling will also be disabled to prevent " + "it from consuming all available redis memory.")) @click.option( "--num-workers", required=False, @@ -202,11 +218,11 @@ def cli(logging_level, logging_format): def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, object_store_memory, - num_workers, num_cpus, num_gpus, resources, head, no_ui, block, - plasma_directory, huge_pages, autoscaling_config, - no_redirect_worker_output, no_redirect_output, - plasma_store_socket_name, raylet_socket_name, temp_dir, - internal_config): + redis_max_memory, collect_profiling_data, num_workers, num_cpus, + num_gpus, resources, head, no_ui, block, plasma_directory, + huge_pages, autoscaling_config, no_redirect_worker_output, + no_redirect_output, plasma_store_socket_name, raylet_socket_name, + temp_dir, internal_config): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -262,6 +278,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_port=redis_port, redis_shard_ports=redis_shard_ports, object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory, + collect_profiling_data=collect_profiling_data, num_workers=num_workers, cleanup=False, redirect_worker_output=not no_redirect_worker_output, diff --git a/python/ray/services.py b/python/ray/services.py index c2292484a6e4..e96196b5f946 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -416,7 +416,8 @@ def start_redis(node_ip_address, redirect_worker_output=False, cleanup=True, password=None, - use_credis=None): + use_credis=None, + redis_max_memory=None): """Start the Redis global state store. Args: @@ -445,6 +446,10 @@ def start_redis(node_ip_address, use_credis: If True, additionally load the chain-replicated libraries into the redis servers. Defaults to None, which means its value is set by the presence of "RAY_USE_NEW_GCS" in os.environ. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). Returns: A tuple of the address for the primary Redis shard and a list of @@ -475,7 +480,8 @@ def start_redis(node_ip_address, stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, cleanup=cleanup, - password=password) + password=password, + redis_max_memory=None) else: assigned_port, _ = _start_redis_instance( node_ip_address=node_ip_address, @@ -489,7 +495,8 @@ def start_redis(node_ip_address, # as the latter contains an extern declaration that the former # supplies. modules=[CREDIS_MASTER_MODULE, REDIS_MODULE], - password=password) + password=password, + redis_max_memory=None) if port is not None: assert assigned_port == port port = assigned_port @@ -523,7 +530,8 @@ def start_redis(node_ip_address, stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, cleanup=cleanup, - password=password) + password=password, + redis_max_memory=redis_max_memory) else: assert num_redis_shards == 1, \ "For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\ @@ -540,7 +548,8 @@ def start_redis(node_ip_address, # It is important to load the credis module BEFORE the ray # module, as the latter contains an extern declaration that the # former supplies. - modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE]) + modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE], + redis_max_memory=redis_max_memory) if redis_shard_ports[i] is not None: assert redis_shard_port == redis_shard_ports[i] @@ -570,7 +579,8 @@ def _start_redis_instance(node_ip_address="127.0.0.1", cleanup=True, password=None, executable=REDIS_EXECUTABLE, - modules=None): + modules=None, + redis_max_memory=None): """Start a single Redis server. Args: @@ -594,6 +604,9 @@ def _start_redis_instance(node_ip_address="127.0.0.1", modules (list of str): A list of pathnames, pointing to the redis module(s) that will be loaded in this redis server. If None, load the default Ray redis module. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. Returns: A tuple of the port used by Redis and a handle to the process that was @@ -657,6 +670,14 @@ def _start_redis_instance(node_ip_address="127.0.0.1", # hosts can connect to it. TODO(rkn): Do this in a more secure way. redis_client.config_set("protected-mode", "no") + # Discard old task and object metadata. + if redis_max_memory is not None: + redis_client.config_set("maxmemory", str(redis_max_memory)) + redis_client.config_set("maxmemory-policy", "allkeys-lru") + redis_client.config_set("maxmemory-samples", "10") + logger.info("Starting Redis shard with {} GB max memory.".format( + round(redis_max_memory / 1e9, 2))) + # If redis_max_clients is provided, attempt to raise the number of maximum # number of Redis clients. if redis_max_clients is not None: @@ -861,7 +882,8 @@ def start_raylet(redis_address, stderr_file=None, cleanup=True, config=None, - redis_password=None): + redis_password=None, + collect_profiling_data=True): """Start a raylet, which is a combined local scheduler and object manager. Args: @@ -894,6 +916,7 @@ def start_raylet(redis_address, config (dict|None): Optional Raylet configuration that will override defaults in RayConfig. redis_password (str): The password of the redis server. + collect_profiling_data: Whether to collect profiling data from workers. Returns: The raylet socket name. @@ -923,9 +946,11 @@ def start_raylet(redis_address, "--object-store-name={} " "--raylet-name={} " "--redis-address={} " + "--collect-profiling-data={} " "--temp-dir={}".format( sys.executable, worker_path, node_ip_address, plasma_store_name, raylet_name, redis_address, + "1" if collect_profiling_data else "0", get_temp_root())) if redis_password: start_worker_command += " --redis-password {}".format(redis_password) @@ -1260,6 +1285,8 @@ def start_ray_processes(address_info=None, num_workers=None, num_local_schedulers=1, object_store_memory=None, + redis_max_memory=None, + collect_profiling_data=True, num_redis_shards=1, redis_max_clients=None, redis_password=None, @@ -1306,6 +1333,14 @@ def start_ray_processes(address_info=None, address_info. object_store_memory: The amount of memory (in bytes) to start the object store with. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). + collect_profiling_data: Whether to collect profiling data. Note that + profiling data cannot be LRU evicted, so if you set + redis_max_memory then profiling will also be disabled to prevent + it from consuming all available redis memory. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. redis_max_clients: If provided, attempt to configure Redis with this @@ -1397,7 +1432,8 @@ def start_ray_processes(address_info=None, redirect_output=True, redirect_worker_output=redirect_worker_output, cleanup=cleanup, - password=redis_password) + password=redis_password, + redis_max_memory=redis_max_memory) address_info["redis_address"] = redis_address time.sleep(0.1) @@ -1497,6 +1533,7 @@ def start_ray_processes(address_info=None, stderr_file=raylet_stderr_file, cleanup=cleanup, redis_password=redis_password, + collect_profiling_data=collect_profiling_data, config=config)) # Try to start the web UI. @@ -1617,6 +1654,8 @@ def start_ray_head(address_info=None, num_workers=None, num_local_schedulers=1, object_store_memory=None, + redis_max_memory=None, + collect_profiling_data=True, worker_path=None, cleanup=True, redirect_worker_output=False, @@ -1662,6 +1701,11 @@ def start_ray_head(address_info=None, address_info. object_store_memory: The amount of memory (in bytes) to start the object store with. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). + collect_profiling_data: Whether to collect profiling data from workers. worker_path (str): The path of the source code that will be run by the worker. cleanup (bool): If cleanup is true, then the processes started here @@ -1712,6 +1756,8 @@ def start_ray_head(address_info=None, num_workers=num_workers, num_local_schedulers=num_local_schedulers, object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory, + collect_profiling_data=collect_profiling_data, worker_path=worker_path, cleanup=cleanup, redirect_worker_output=redirect_worker_output, diff --git a/python/ray/worker.py b/python/ray/worker.py index 844f35cc7dd1..de2513780ad5 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -217,7 +217,7 @@ def __init__(self): # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray.utils.get_cuda_visible_devices() - self.profiler = profiling.Profiler(self) + self.profiler = None self.memory_monitor = memory_monitor.MemoryMonitor() self.state_lock = threading.Lock() # A dictionary that maps from driver id to SerializationContext @@ -1342,6 +1342,8 @@ def _init(address_info=None, num_workers=None, num_local_schedulers=None, object_store_memory=None, + redis_max_memory=None, + collect_profiling_data=True, local_mode=False, driver_mode=None, redirect_worker_output=False, @@ -1386,6 +1388,11 @@ def _init(address_info=None, This is only provided if start_ray_local is True. object_store_memory: The maximum amount of memory (in bytes) to allow the object store to use. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). + collect_profiling_data: Whether to collect profiling data from workers. local_mode (bool): True if the code should be executed serially without Ray. This is useful for debugging. redirect_worker_output: True if the stdout and stderr of worker @@ -1440,6 +1447,11 @@ def _init(address_info=None, else: driver_mode = SCRIPT_MODE + if redis_max_memory and collect_profiling_data: + logger.warn("Profiling data cannot be LRU evicted, so it is disabled " + "when redis_max_memory is set.") + collect_profiling_data = False + # Get addresses of existing services. if address_info is None: address_info = {} @@ -1479,6 +1491,8 @@ def _init(address_info=None, num_workers=num_workers, num_local_schedulers=num_local_schedulers, object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory, + collect_profiling_data=collect_profiling_data, redirect_worker_output=redirect_worker_output, redirect_output=redirect_output, start_workers_from_local_scheduler=( @@ -1519,6 +1533,9 @@ def _init(address_info=None, if object_store_memory is not None: raise Exception("When connecting to an existing cluster, " "object_store_memory must not be provided.") + if redis_max_memory is not None: + raise Exception("When connecting to an existing cluster, " + "redis_max_memory must not be provided.") if plasma_directory is not None: raise Exception("When connecting to an existing cluster, " "plasma_directory must not be provided.") @@ -1556,7 +1573,7 @@ def _init(address_info=None, "node_ip_address": node_ip_address, "redis_address": address_info["redis_address"], "store_socket_name": address_info["object_store_addresses"][0], - "webui_url": address_info["webui_url"] + "webui_url": address_info["webui_url"], } driver_address_info["raylet_socket_name"] = ( address_info["raylet_socket_names"][0]) @@ -1569,7 +1586,8 @@ def _init(address_info=None, mode=driver_mode, worker=global_worker, driver_id=driver_id, - redis_password=redis_password) + redis_password=redis_password, + collect_profiling_data=collect_profiling_data) return address_info @@ -1578,6 +1596,8 @@ def init(redis_address=None, num_gpus=None, resources=None, object_store_memory=None, + redis_max_memory=None, + collect_profiling_data=True, node_ip_address=None, object_id_seed=None, num_workers=None, @@ -1634,6 +1654,11 @@ def init(redis_address=None, of that resource available. object_store_memory: The amount of memory (in bytes) to start the object store with. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). + collect_profiling_data: Whether to collect profiling data from workers. node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the @@ -1734,6 +1759,8 @@ def init(redis_address=None, huge_pages=huge_pages, include_webui=include_webui, object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory, + collect_profiling_data=collect_profiling_data, driver_id=driver_id, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, @@ -1913,7 +1940,8 @@ def connect(info, mode=WORKER_MODE, worker=global_worker, driver_id=None, - redis_password=None): + redis_password=None, + collect_profiling_data=True): """Connect this worker to the local scheduler, to Plasma, and to Redis. Args: @@ -1926,6 +1954,7 @@ def connect(info, driver_id: The ID of driver. If it's None, then we will generate one. redis_password (str): Prevents external clients without the password from connecting to Redis if provided. + collect_profiling_data: Whether to collect profiling data from workers. """ # Do some basic checking to make sure we didn't call ray.init twice. error_message = "Perhaps you called ray.init twice by accident?" @@ -1935,6 +1964,11 @@ def connect(info, # Enable nice stack traces on SIGSEGV etc. faulthandler.enable(all_threads=False) + if collect_profiling_data: + worker.profiler = profiling.Profiler(worker) + else: + worker.profiler = profiling.NoopProfiler() + # Initialize some fields. if mode is WORKER_MODE: worker.worker_id = random_string() diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index b9c9500e7087..dc1085783b8a 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -55,6 +55,11 @@ type=str, default=ray_constants.LOGGER_FORMAT, help=ray_constants.LOGGER_FORMAT_HELP) +parser.add_argument( + "--collect-profiling-data", + type=int, # int since argparse can't handle bool values + default=1, + help="Whether to collect profiling data from workers.") parser.add_argument( "--temp-dir", required=False, @@ -71,7 +76,7 @@ "redis_password": args.redis_password, "store_socket_name": args.object_store_name, "manager_socket_name": args.object_store_manager_name, - "raylet_socket_name": args.raylet_name + "raylet_socket_name": args.raylet_name, } logging.basicConfig( @@ -82,7 +87,10 @@ tempfile_services.set_temp_root(args.temp_dir) ray.worker.connect( - info, mode=ray.WORKER_MODE, redis_password=args.redis_password) + info, + mode=ray.WORKER_MODE, + redis_password=args.redis_password, + collect_profiling_data=args.collect_profiling_data) error_explanation = """ This error is unexpected and should not have happened. Somehow a worker diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index 480c976a5def..97767fc3f396 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -200,7 +200,7 @@ class RayConfig { : ray_protocol_version_(0x0000000000000000), handler_warning_timeout_ms_(100), heartbeat_timeout_milliseconds_(100), - num_heartbeats_timeout_(100), + num_heartbeats_timeout_(300), num_heartbeats_warning_(5), debug_dump_period_milliseconds_(10000), initial_reconstruction_timeout_milliseconds_(10000), diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9110f0c87881..aa49bcd3b95e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1543,7 +1543,12 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { [this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) { // The task was not in the GCS task table. It must therefore be in the // lineage cache. - RAY_CHECK(lineage_cache_.ContainsTask(task_id)); + RAY_CHECK(lineage_cache_.ContainsTask(task_id)) + << "Task metadata not found in either GCS or lineage cache. It may have been " + "evicted " + << "by the redis LRU configuration. Consider increasing the memory " + "allocation via " + << "ray.init(redis_max_memory=)."; // Use a copy of the cached task spec to re-execute the task. const Task task = lineage_cache_.GetTask(task_id); ResubmitTask(task); diff --git a/test/failure_test.py b/test/failure_test.py index 027ed38d6411..2c4a92bd496c 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -3,6 +3,7 @@ from __future__ import print_function import numpy as np +import json import os import ray import sys @@ -570,7 +571,13 @@ class Foo(object): @pytest.fixture def ray_start_two_nodes(): # Start the Ray processes. - ray.worker._init(start_ray_local=True, num_local_schedulers=2, num_cpus=0) + ray.worker._init( + start_ray_local=True, + num_local_schedulers=2, + num_cpus=0, + _internal_config=json.dumps({ + "num_heartbeats_timeout": 40 + })) yield None # The code after the yield will run as teardown code. ray.shutdown() @@ -594,7 +601,7 @@ def test_warning_for_dead_node(ray_start_two_nodes): ray.services.all_processes[ray.services.PROCESS_TYPE_RAYLET][0].kill() # Check that we get warning messages for both raylets. - wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=20) + wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=40) # Extract the client IDs from the error messages. This will need to be # changed if the error message changes.