From bb3983c605eb76e8a174a5795b907d6fd217e923 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 17:47:03 -0800 Subject: [PATCH 01/16] wip --- python/ray/scripts/scripts.py | 22 ++++++++++++- python/ray/services.py | 58 ++++++++++++++++++++++++++++++---- python/ray/worker.py | 27 ++++++++++++++++ src/ray/raylet/node_manager.cc | 5 ++- 4 files changed, 103 insertions(+), 9 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 60b05e4226d2..19bfe4f0ec4b 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -116,6 +116,23 @@ def cli(logging_level, logging_format): type=int, help="the maximum amount of memory (in bytes) to allow the " "object store to use") +@click.option( + "--redis-max-memory", + required=False, + type=int, + help=( + "The max amount of memory (in bytes) to allow redis to use, or None " + "for no limit. Once the limit is exceeded, redis will start LRU " + "eviction of entries. This only applies to the sharded " + "redis tables (task and object tables).")) +@click.option( + "--collect-profiling-data", + is_flag=True, + default=True, + help=("Whether to collect profiling data. Note that " + "profiling data cannot be LRU evicted, so if you set " + "redis_max_memory then profiling should also be disabled to prevent " + "it from consuming all available redis memory.")) @click.option( "--num-workers", required=False, @@ -202,7 +219,8 @@ def cli(logging_level, logging_format): def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, object_store_memory, - num_workers, num_cpus, num_gpus, resources, head, no_ui, block, + redis_max_memory, collect_profiling_data, num_workers, num_cpus, + num_gpus, resources, head, no_ui, block, plasma_directory, huge_pages, autoscaling_config, no_redirect_worker_output, no_redirect_output, plasma_store_socket_name, raylet_socket_name, temp_dir, @@ -262,6 +280,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_port=redis_port, redis_shard_ports=redis_shard_ports, object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory, + collect_profiling_data=collect_profiling_data, num_workers=num_workers, cleanup=False, redirect_worker_output=not no_redirect_worker_output, diff --git a/python/ray/services.py b/python/ray/services.py index c2292484a6e4..8e25530b1aa6 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -416,7 +416,8 @@ def start_redis(node_ip_address, redirect_worker_output=False, cleanup=True, password=None, - use_credis=None): + use_credis=None, + redis_max_memory=None): """Start the Redis global state store. Args: @@ -445,6 +446,10 @@ def start_redis(node_ip_address, use_credis: If True, additionally load the chain-replicated libraries into the redis servers. Defaults to None, which means its value is set by the presence of "RAY_USE_NEW_GCS" in os.environ. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). Returns: A tuple of the address for the primary Redis shard and a list of @@ -475,7 +480,8 @@ def start_redis(node_ip_address, stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, cleanup=cleanup, - password=password) + password=password, + redis_max_memory=None) else: assigned_port, _ = _start_redis_instance( node_ip_address=node_ip_address, @@ -489,7 +495,8 @@ def start_redis(node_ip_address, # as the latter contains an extern declaration that the former # supplies. modules=[CREDIS_MASTER_MODULE, REDIS_MODULE], - password=password) + password=password, + redis_max_memory=None) if port is not None: assert assigned_port == port port = assigned_port @@ -523,7 +530,8 @@ def start_redis(node_ip_address, stdout_file=redis_stdout_file, stderr_file=redis_stderr_file, cleanup=cleanup, - password=password) + password=password, + redis_max_memory=redis_max_memory) else: assert num_redis_shards == 1, \ "For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\ @@ -540,7 +548,8 @@ def start_redis(node_ip_address, # It is important to load the credis module BEFORE the ray # module, as the latter contains an extern declaration that the # former supplies. - modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE]) + modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE], + redis_max_memory=redis_max_memory) if redis_shard_ports[i] is not None: assert redis_shard_port == redis_shard_ports[i] @@ -570,7 +579,8 @@ def _start_redis_instance(node_ip_address="127.0.0.1", cleanup=True, password=None, executable=REDIS_EXECUTABLE, - modules=None): + modules=None, + redis_max_memory=None): """Start a single Redis server. Args: @@ -594,6 +604,9 @@ def _start_redis_instance(node_ip_address="127.0.0.1", modules (list of str): A list of pathnames, pointing to the redis module(s) that will be loaded in this redis server. If None, load the default Ray redis module. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. Returns: A tuple of the port used by Redis and a handle to the process that was @@ -657,6 +670,14 @@ def _start_redis_instance(node_ip_address="127.0.0.1", # hosts can connect to it. TODO(rkn): Do this in a more secure way. redis_client.config_set("protected-mode", "no") + # Discard old task and object metadata. + if redis_max_memory is not None: + redis_client.config_set("maxmemory", str(redis_max_memory)) + redis_client.config_set("maxmemory-policy", "allkeys-lru") + redis_client.config_set("maxmemory-samples", "10") + logger.info("Starting Redis shard with {} GB max memory.".format( + round(redis_max_memory / 1e9, 2))) + # If redis_max_clients is provided, attempt to raise the number of maximum # number of Redis clients. if redis_max_clients is not None: @@ -1260,6 +1281,8 @@ def start_ray_processes(address_info=None, num_workers=None, num_local_schedulers=1, object_store_memory=None, + redis_max_memory=None, + collect_profiling_data=True, num_redis_shards=1, redis_max_clients=None, redis_password=None, @@ -1306,6 +1329,14 @@ def start_ray_processes(address_info=None, address_info. object_store_memory: The amount of memory (in bytes) to start the object store with. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). + collect_profiling_data: Whether to collect profiling data. Note that + profiling data cannot be LRU evicted, so if you set + redis_max_memory then profiling should also be disabled to prevent + it from consuming all available redis memory. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. redis_max_clients: If provided, attempt to configure Redis with this @@ -1397,7 +1428,8 @@ def start_ray_processes(address_info=None, redirect_output=True, redirect_worker_output=redirect_worker_output, cleanup=cleanup, - password=redis_password) + password=redis_password, + redis_max_memory=redis_max_memory) address_info["redis_address"] = redis_address time.sleep(0.1) @@ -1617,6 +1649,8 @@ def start_ray_head(address_info=None, num_workers=None, num_local_schedulers=1, object_store_memory=None, + redis_max_memory=None, + collect_profiling_data=True, worker_path=None, cleanup=True, redirect_worker_output=False, @@ -1662,6 +1696,14 @@ def start_ray_head(address_info=None, address_info. object_store_memory: The amount of memory (in bytes) to start the object store with. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). + collect_profiling_data: Whether to collect profiling data. Note that + profiling data cannot be LRU evicted, so if you set + redis_max_memory then profiling should also be disabled to prevent + it from consuming all available redis memory. worker_path (str): The path of the source code that will be run by the worker. cleanup (bool): If cleanup is true, then the processes started here @@ -1712,6 +1754,8 @@ def start_ray_head(address_info=None, num_workers=num_workers, num_local_schedulers=num_local_schedulers, object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory, + collect_profiling_data=collect_profiling_data, worker_path=worker_path, cleanup=cleanup, redirect_worker_output=redirect_worker_output, diff --git a/python/ray/worker.py b/python/ray/worker.py index 844f35cc7dd1..b14bb782f2c7 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1342,6 +1342,8 @@ def _init(address_info=None, num_workers=None, num_local_schedulers=None, object_store_memory=None, + redis_max_memory=None, + collect_profiling_data=True, local_mode=False, driver_mode=None, redirect_worker_output=False, @@ -1386,6 +1388,14 @@ def _init(address_info=None, This is only provided if start_ray_local is True. object_store_memory: The maximum amount of memory (in bytes) to allow the object store to use. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). + collect_profiling_data: Whether to collect profiling data. Note that + profiling data cannot be LRU evicted, so if you set + redis_max_memory then profiling should also be disabled to prevent + it from consuming all available redis memory. local_mode (bool): True if the code should be executed serially without Ray. This is useful for debugging. redirect_worker_output: True if the stdout and stderr of worker @@ -1479,6 +1489,8 @@ def _init(address_info=None, num_workers=num_workers, num_local_schedulers=num_local_schedulers, object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory, + collect_profiling_data=collect_profiling_data, redirect_worker_output=redirect_worker_output, redirect_output=redirect_output, start_workers_from_local_scheduler=( @@ -1519,6 +1531,9 @@ def _init(address_info=None, if object_store_memory is not None: raise Exception("When connecting to an existing cluster, " "object_store_memory must not be provided.") + if redis_max_memory is not None: + raise Exception("When connecting to an existing cluster, " + "redis_max_memory must not be provided.") if plasma_directory is not None: raise Exception("When connecting to an existing cluster, " "plasma_directory must not be provided.") @@ -1578,6 +1593,8 @@ def init(redis_address=None, num_gpus=None, resources=None, object_store_memory=None, + redis_max_memory=None, + collect_profiling_data=True, node_ip_address=None, object_id_seed=None, num_workers=None, @@ -1634,6 +1651,14 @@ def init(redis_address=None, of that resource available. object_store_memory: The amount of memory (in bytes) to start the object store with. + redis_max_memory: The max amount of memory (in bytes) to allow redis + to use, or None for no limit. Once the limit is exceeded, redis + will start LRU eviction of entries. This only applies to the + sharded redis tables (task and object tables). + collect_profiling_data: Whether to collect profiling data. Note that + profiling data cannot be LRU evicted, so if you set + redis_max_memory then profiling should also be disabled to prevent + it from consuming all available redis memory. node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the @@ -1734,6 +1759,8 @@ def init(redis_address=None, huge_pages=huge_pages, include_webui=include_webui, object_store_memory=object_store_memory, + redis_max_memory=redis_max_memory, + collect_profiling_data=collect_profiling_data, driver_id=driver_id, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9110f0c87881..26bec4283afe 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1543,7 +1543,10 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { [this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) { // The task was not in the GCS task table. It must therefore be in the // lineage cache. - RAY_CHECK(lineage_cache_.ContainsTask(task_id)); + RAY_CHECK(lineage_cache_.ContainsTask(task_id)) << + "Task metadata not found in either GCS or lineage cache. It may have been evicted " << + "by the redis LRU configuration. Consider increasing the memory allocation via " << + "ray.init(redis_max_memory=)."; // Use a copy of the cached task spec to re-execute the task. const Task task = lineage_cache_.GetTask(task_id); ResubmitTask(task); From ced0b49b616a1cdb006ef8be040be8e64c03eae7 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 18:00:59 -0800 Subject: [PATCH 02/16] wip --- python/ray/scripts/scripts.py | 4 ++-- python/ray/services.py | 21 ++++++++++++++------- python/ray/worker.py | 10 ++-------- python/ray/workers/default_worker.py | 5 +++++ 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 19bfe4f0ec4b..298270eb2040 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -127,11 +127,11 @@ def cli(logging_level, logging_format): "redis tables (task and object tables).")) @click.option( "--collect-profiling-data", - is_flag=True, default=True, + type=bool, help=("Whether to collect profiling data. Note that " "profiling data cannot be LRU evicted, so if you set " - "redis_max_memory then profiling should also be disabled to prevent " + "redis_max_memory then profiling will also be disabled to prevent " "it from consuming all available redis memory.")) @click.option( "--num-workers", diff --git a/python/ray/services.py b/python/ray/services.py index 8e25530b1aa6..690364155a04 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -882,7 +882,8 @@ def start_raylet(redis_address, stderr_file=None, cleanup=True, config=None, - redis_password=None): + redis_password=None, + collect_profiling_data=True): """Start a raylet, which is a combined local scheduler and object manager. Args: @@ -915,6 +916,7 @@ def start_raylet(redis_address, config (dict|None): Optional Raylet configuration that will override defaults in RayConfig. redis_password (str): The password of the redis server. + collect_profiling_data: Whether to collect profiling data from workers. Returns: The raylet socket name. @@ -944,10 +946,11 @@ def start_raylet(redis_address, "--object-store-name={} " "--raylet-name={} " "--redis-address={} " + "--collect-profiling-data={} " "--temp-dir={}".format( sys.executable, worker_path, node_ip_address, plasma_store_name, raylet_name, redis_address, - get_temp_root())) + collect_profiling_data, get_temp_root())) if redis_password: start_worker_command += " --redis-password {}".format(redis_password) @@ -1335,7 +1338,7 @@ def start_ray_processes(address_info=None, sharded redis tables (task and object tables). collect_profiling_data: Whether to collect profiling data. Note that profiling data cannot be LRU evicted, so if you set - redis_max_memory then profiling should also be disabled to prevent + redis_max_memory then profiling will also be disabled to prevent it from consuming all available redis memory. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. @@ -1393,6 +1396,12 @@ def start_ray_processes(address_info=None, if not isinstance(resources, list): resources = num_local_schedulers * [resources] + if redis_max_memory and collect_profiling_data: + logger.warn( + "Profiling data cannot be LRU evicted, so it is disabled " + "when redis_max_memory is set.") + collect_profiling_data = False + if num_workers is not None: raise Exception("The 'num_workers' argument is deprecated. Please use " "'num_cpus' instead.") @@ -1529,6 +1538,7 @@ def start_ray_processes(address_info=None, stderr_file=raylet_stderr_file, cleanup=cleanup, redis_password=redis_password, + collect_profiling_data=collect_profiling_data, config=config)) # Try to start the web UI. @@ -1700,10 +1710,7 @@ def start_ray_head(address_info=None, to use, or None for no limit. Once the limit is exceeded, redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). - collect_profiling_data: Whether to collect profiling data. Note that - profiling data cannot be LRU evicted, so if you set - redis_max_memory then profiling should also be disabled to prevent - it from consuming all available redis memory. + collect_profiling_data: Whether to collect profiling data from workers. worker_path (str): The path of the source code that will be run by the worker. cleanup (bool): If cleanup is true, then the processes started here diff --git a/python/ray/worker.py b/python/ray/worker.py index b14bb782f2c7..c70acd0d5f34 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1392,10 +1392,7 @@ def _init(address_info=None, to use, or None for no limit. Once the limit is exceeded, redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). - collect_profiling_data: Whether to collect profiling data. Note that - profiling data cannot be LRU evicted, so if you set - redis_max_memory then profiling should also be disabled to prevent - it from consuming all available redis memory. + collect_profiling_data: Whether to collect profiling data from workers. local_mode (bool): True if the code should be executed serially without Ray. This is useful for debugging. redirect_worker_output: True if the stdout and stderr of worker @@ -1655,10 +1652,7 @@ def init(redis_address=None, to use, or None for no limit. Once the limit is exceeded, redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). - collect_profiling_data: Whether to collect profiling data. Note that - profiling data cannot be LRU evicted, so if you set - redis_max_memory then profiling should also be disabled to prevent - it from consuming all available redis memory. + collect_profiling_data: Whether to collect profiling data from workers. node_ip_address (str): The IP address of the node that we are on. object_id_seed (int): Used to seed the deterministic generation of object IDs. The same value can be used across multiple runs of the diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index b9c9500e7087..f9af462fe269 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -55,6 +55,11 @@ type=str, default=ray_constants.LOGGER_FORMAT, help=ray_constants.LOGGER_FORMAT_HELP) +parser.add_argument( + "--collect-profiling-data", + type=bool, + default=True, + help="Whether to collect profiling data from workers.") parser.add_argument( "--temp-dir", required=False, From 99e490fe9501627948d85788dda2e571c8bd83a8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 18:01:07 -0800 Subject: [PATCH 03/16] format --- python/ray/scripts/scripts.py | 18 ++++++++---------- python/ray/services.py | 5 ++--- src/ray/raylet/node_manager.cc | 10 ++++++---- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 298270eb2040..b84db6757c86 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -120,11 +120,10 @@ def cli(logging_level, logging_format): "--redis-max-memory", required=False, type=int, - help=( - "The max amount of memory (in bytes) to allow redis to use, or None " - "for no limit. Once the limit is exceeded, redis will start LRU " - "eviction of entries. This only applies to the sharded " - "redis tables (task and object tables).")) + help=("The max amount of memory (in bytes) to allow redis to use, or None " + "for no limit. Once the limit is exceeded, redis will start LRU " + "eviction of entries. This only applies to the sharded " + "redis tables (task and object tables).")) @click.option( "--collect-profiling-data", default=True, @@ -220,11 +219,10 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, object_store_memory, redis_max_memory, collect_profiling_data, num_workers, num_cpus, - num_gpus, resources, head, no_ui, block, - plasma_directory, huge_pages, autoscaling_config, - no_redirect_worker_output, no_redirect_output, - plasma_store_socket_name, raylet_socket_name, temp_dir, - internal_config): + num_gpus, resources, head, no_ui, block, plasma_directory, + huge_pages, autoscaling_config, no_redirect_worker_output, + no_redirect_output, plasma_store_socket_name, raylet_socket_name, + temp_dir, internal_config): # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) diff --git a/python/ray/services.py b/python/ray/services.py index 690364155a04..0329d5c206b5 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -1397,9 +1397,8 @@ def start_ray_processes(address_info=None, resources = num_local_schedulers * [resources] if redis_max_memory and collect_profiling_data: - logger.warn( - "Profiling data cannot be LRU evicted, so it is disabled " - "when redis_max_memory is set.") + logger.warn("Profiling data cannot be LRU evicted, so it is disabled " + "when redis_max_memory is set.") collect_profiling_data = False if num_workers is not None: diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 26bec4283afe..aa49bcd3b95e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1543,10 +1543,12 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { [this](ray::gcs::AsyncGcsClient *client, const TaskID &task_id) { // The task was not in the GCS task table. It must therefore be in the // lineage cache. - RAY_CHECK(lineage_cache_.ContainsTask(task_id)) << - "Task metadata not found in either GCS or lineage cache. It may have been evicted " << - "by the redis LRU configuration. Consider increasing the memory allocation via " << - "ray.init(redis_max_memory=)."; + RAY_CHECK(lineage_cache_.ContainsTask(task_id)) + << "Task metadata not found in either GCS or lineage cache. It may have been " + "evicted " + << "by the redis LRU configuration. Consider increasing the memory " + "allocation via " + << "ray.init(redis_max_memory=)."; // Use a copy of the cached task spec to re-execute the task. const Task task = lineage_cache_.GetTask(task_id); ResubmitTask(task); From 8f431500cbfa4037be7aa048218d0636a418e8be Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 18:12:47 -0800 Subject: [PATCH 04/16] wip --- python/ray/profiling.py | 13 +++++++++++++ python/ray/services.py | 8 ++------ python/ray/worker.py | 15 +++++++++++++-- python/ray/workers/default_worker.py | 8 +++++--- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/python/ray/profiling.py b/python/ray/profiling.py index 42b02f8926be..8cdd8296ed61 100644 --- a/python/ray/profiling.py +++ b/python/ray/profiling.py @@ -128,6 +128,19 @@ def add_event(self, event): self.events.append(event) +class NoopProfiler(object): + """A no-op profile used when collect_profile_data=False.""" + + def start_flush_thread(self): + pass + + def flush_profile_data(self): + pass + + def add_event(self, event): + pass + + class RayLogSpanRaylet(object): """An object used to enable logging a span of events with a with statement. diff --git a/python/ray/services.py b/python/ray/services.py index 0329d5c206b5..e96196b5f946 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -950,7 +950,8 @@ def start_raylet(redis_address, "--temp-dir={}".format( sys.executable, worker_path, node_ip_address, plasma_store_name, raylet_name, redis_address, - collect_profiling_data, get_temp_root())) + "1" if collect_profiling_data else "0", + get_temp_root())) if redis_password: start_worker_command += " --redis-password {}".format(redis_password) @@ -1396,11 +1397,6 @@ def start_ray_processes(address_info=None, if not isinstance(resources, list): resources = num_local_schedulers * [resources] - if redis_max_memory and collect_profiling_data: - logger.warn("Profiling data cannot be LRU evicted, so it is disabled " - "when redis_max_memory is set.") - collect_profiling_data = False - if num_workers is not None: raise Exception("The 'num_workers' argument is deprecated. Please use " "'num_cpus' instead.") diff --git a/python/ray/worker.py b/python/ray/worker.py index c70acd0d5f34..51840cdabf36 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -217,7 +217,7 @@ def __init__(self): # When the worker is constructed. Record the original value of the # CUDA_VISIBLE_DEVICES environment variable. self.original_gpu_ids = ray.utils.get_cuda_visible_devices() - self.profiler = profiling.Profiler(self) + self.profiler = None self.memory_monitor = memory_monitor.MemoryMonitor() self.state_lock = threading.Lock() # A dictionary that maps from driver id to SerializationContext @@ -1447,6 +1447,11 @@ def _init(address_info=None, else: driver_mode = SCRIPT_MODE + if redis_max_memory and collect_profiling_data: + logger.warn("Profiling data cannot be LRU evicted, so it is disabled " + "when redis_max_memory is set.") + collect_profiling_data = False + # Get addresses of existing services. if address_info is None: address_info = {} @@ -1568,7 +1573,8 @@ def _init(address_info=None, "node_ip_address": node_ip_address, "redis_address": address_info["redis_address"], "store_socket_name": address_info["object_store_addresses"][0], - "webui_url": address_info["webui_url"] + "webui_url": address_info["webui_url"], + "collect_profiling_data": collect_profiling_data, } driver_address_info["raylet_socket_name"] = ( address_info["raylet_socket_names"][0]) @@ -1956,6 +1962,11 @@ def connect(info, # Enable nice stack traces on SIGSEGV etc. faulthandler.enable(all_threads=False) + if info["collect_profiling_data"]: + worker.profiler = profiling.Profiler(worker) + else: + worker.profiler = profiling.NoopProfiler() + # Initialize some fields. if mode is WORKER_MODE: worker.worker_id = random_string() diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index f9af462fe269..14828bb81f62 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -57,8 +57,8 @@ help=ray_constants.LOGGER_FORMAT_HELP) parser.add_argument( "--collect-profiling-data", - type=bool, - default=True, + type=int, # but argparse can't handle bool values + default=1, help="Whether to collect profiling data from workers.") parser.add_argument( "--temp-dir", @@ -76,8 +76,10 @@ "redis_password": args.redis_password, "store_socket_name": args.object_store_name, "manager_socket_name": args.object_store_manager_name, - "raylet_socket_name": args.raylet_name + "raylet_socket_name": args.raylet_name, + "collect_profiling_data": args.collect_profiling_data, } + print(info) logging.basicConfig( level=logging.getLevelName(args.logging_level.upper()), From c29252adcd4fbfaa693ba50730045a50b279daf7 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 18:15:02 -0800 Subject: [PATCH 05/16] note --- python/ray/memory_monitor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/memory_monitor.py b/python/ray/memory_monitor.py index 00cf86816dbf..a52f98d7077d 100644 --- a/python/ray/memory_monitor.py +++ b/python/ray/memory_monitor.py @@ -37,7 +37,8 @@ def get_message(used_gb, total_gb, threshold): round(psutil.virtual_memory().shared / 1e9, 2)) + "currently being used by the Ray object store. You can set " "the object store size with the `object_store_memory` " - "parameter when starting Ray.") + "parameter when starting Ray, and the max Redis size with " + "`redis_max_memory`.") class MemoryMonitor(object): From 0266f639253a747f918d99acbd52e3a14e3e2d1e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 18:15:23 -0800 Subject: [PATCH 06/16] lint --- python/ray/workers/default_worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 14828bb81f62..87d985770e9b 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -79,7 +79,6 @@ "raylet_socket_name": args.raylet_name, "collect_profiling_data": args.collect_profiling_data, } - print(info) logging.basicConfig( level=logging.getLevelName(args.logging_level.upper()), From 9909466fe86ec1888b2afeb22134a0e0b428ba7d Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 18:15:39 -0800 Subject: [PATCH 07/16] fix --- python/ray/workers/default_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 87d985770e9b..c46eafcecfc5 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -57,7 +57,7 @@ help=ray_constants.LOGGER_FORMAT_HELP) parser.add_argument( "--collect-profiling-data", - type=int, # but argparse can't handle bool values + type=int, # int since argparse can't handle bool values default=1, help="Whether to collect profiling data from workers.") parser.add_argument( From b499d4255c92109a571b1c493f8ac78cf92f0b70 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 18:38:19 -0800 Subject: [PATCH 08/16] flag --- python/ray/rllib/train.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index a1d8e13a1f57..b44d059cced4 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -56,6 +56,12 @@ def create_parser(parser_creator=None): default=None, type=int, help="Emulate multiple cluster nodes for debugging.") + parser.add_argument( + "--ray-max-redis-memory", + default=None, + type=int, + help="--max-redis-memory to pass to Ray." + " This only has an affect in local mode.") parser.add_argument( "--ray-object-store-memory", default=None, @@ -122,12 +128,14 @@ def run(args, parser): "num_cpus": args.ray_num_cpus or 1, "num_gpus": args.ray_num_gpus or 0, }, - object_store_memory=args.ray_object_store_memory) + object_store_memory=args.ray_object_store_memory, + redis_max_memory=args.ray_redis_max_memory) ray.init(redis_address=cluster.redis_address) else: ray.init( redis_address=args.redis_address, object_store_memory=args.ray_object_store_memory, + redis_max_memory=args.ray_redis_max_memory, num_cpus=args.ray_num_cpus, num_gpus=args.ray_num_gpus) run_experiments( From ffe6fed4f7040aa12a588064004f7c8730b298b5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 18:42:14 -0800 Subject: [PATCH 09/16] typo --- python/ray/rllib/train.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index b44d059cced4..dc78921099d3 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -57,10 +57,10 @@ def create_parser(parser_creator=None): type=int, help="Emulate multiple cluster nodes for debugging.") parser.add_argument( - "--ray-max-redis-memory", + "--ray-redis-max-memory", default=None, type=int, - help="--max-redis-memory to pass to Ray." + help="--redis-max-memory to pass to Ray." " This only has an affect in local mode.") parser.add_argument( "--ray-object-store-memory", From 9243404892e1ae2fd0cf7e26d911c9b45fe97265 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 22:22:07 -0800 Subject: [PATCH 10/16] raise timeout --- src/ray/ray_config.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index 480c976a5def..60472226fc8a 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -199,8 +199,8 @@ class RayConfig { RayConfig() : ray_protocol_version_(0x0000000000000000), handler_warning_timeout_ms_(100), - heartbeat_timeout_milliseconds_(100), - num_heartbeats_timeout_(100), + heartbeat_timeout_milliseconds_(1000), + num_heartbeats_timeout_(30), num_heartbeats_warning_(5), debug_dump_period_milliseconds_(10000), initial_reconstruction_timeout_milliseconds_(10000), From 61278932e87bb1ce8fdc8068a3b38c8e01d07420 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 22:23:50 -0800 Subject: [PATCH 11/16] fix --- src/ray/ray_config.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ray/ray_config.h b/src/ray/ray_config.h index 60472226fc8a..97767fc3f396 100644 --- a/src/ray/ray_config.h +++ b/src/ray/ray_config.h @@ -199,8 +199,8 @@ class RayConfig { RayConfig() : ray_protocol_version_(0x0000000000000000), handler_warning_timeout_ms_(100), - heartbeat_timeout_milliseconds_(1000), - num_heartbeats_timeout_(30), + heartbeat_timeout_milliseconds_(100), + num_heartbeats_timeout_(300), num_heartbeats_warning_(5), debug_dump_period_milliseconds_(10000), initial_reconstruction_timeout_milliseconds_(10000), From c164419d47332a3e99de0a448703cab6fd04f7a3 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 23:10:48 -0800 Subject: [PATCH 12/16] optional get --- python/ray/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 51840cdabf36..99b99d2f0fb9 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1962,7 +1962,7 @@ def connect(info, # Enable nice stack traces on SIGSEGV etc. faulthandler.enable(all_threads=False) - if info["collect_profiling_data"]: + if info.get("collect_profiling_data"): worker.profiler = profiling.Profiler(worker) else: worker.profiler = profiling.NoopProfiler() From 8202cc10d2dfc9d22e14236d8e6cf303891e2c46 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 7 Dec 2018 23:14:32 -0800 Subject: [PATCH 13/16] fix flag --- python/ray/worker.py | 10 ++++++---- python/ray/workers/default_worker.py | 6 ++++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 99b99d2f0fb9..de2513780ad5 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1574,7 +1574,6 @@ def _init(address_info=None, "redis_address": address_info["redis_address"], "store_socket_name": address_info["object_store_addresses"][0], "webui_url": address_info["webui_url"], - "collect_profiling_data": collect_profiling_data, } driver_address_info["raylet_socket_name"] = ( address_info["raylet_socket_names"][0]) @@ -1587,7 +1586,8 @@ def _init(address_info=None, mode=driver_mode, worker=global_worker, driver_id=driver_id, - redis_password=redis_password) + redis_password=redis_password, + collect_profiling_data=collect_profiling_data) return address_info @@ -1940,7 +1940,8 @@ def connect(info, mode=WORKER_MODE, worker=global_worker, driver_id=None, - redis_password=None): + redis_password=None, + collect_profiling_data=True): """Connect this worker to the local scheduler, to Plasma, and to Redis. Args: @@ -1953,6 +1954,7 @@ def connect(info, driver_id: The ID of driver. If it's None, then we will generate one. redis_password (str): Prevents external clients without the password from connecting to Redis if provided. + collect_profiling_data: Whether to collect profiling data from workers. """ # Do some basic checking to make sure we didn't call ray.init twice. error_message = "Perhaps you called ray.init twice by accident?" @@ -1962,7 +1964,7 @@ def connect(info, # Enable nice stack traces on SIGSEGV etc. faulthandler.enable(all_threads=False) - if info.get("collect_profiling_data"): + if collect_profiling_data: worker.profiler = profiling.Profiler(worker) else: worker.profiler = profiling.NoopProfiler() diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index c46eafcecfc5..dc1085783b8a 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -77,7 +77,6 @@ "store_socket_name": args.object_store_name, "manager_socket_name": args.object_store_manager_name, "raylet_socket_name": args.raylet_name, - "collect_profiling_data": args.collect_profiling_data, } logging.basicConfig( @@ -88,7 +87,10 @@ tempfile_services.set_temp_root(args.temp_dir) ray.worker.connect( - info, mode=ray.WORKER_MODE, redis_password=args.redis_password) + info, + mode=ray.WORKER_MODE, + redis_password=args.redis_password, + collect_profiling_data=args.collect_profiling_data) error_explanation = """ This error is unexpected and should not have happened. Somehow a worker From 73fde90a211ecf354db254ca428fb406821b419a Mon Sep 17 00:00:00 2001 From: Philipp Moritz Date: Sat, 8 Dec 2018 19:53:20 -0800 Subject: [PATCH 14/16] increase timeout in test --- test/failure_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/failure_test.py b/test/failure_test.py index 027ed38d6411..bb128400a290 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -594,7 +594,7 @@ def test_warning_for_dead_node(ray_start_two_nodes): ray.services.all_processes[ray.services.PROCESS_TYPE_RAYLET][0].kill() # Check that we get warning messages for both raylets. - wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=20) + wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=40) # Extract the client IDs from the error messages. This will need to be # changed if the error message changes. From d210a9973d4a742529b25c293e0247b03bb1d9e8 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 9 Dec 2018 00:45:11 -0800 Subject: [PATCH 15/16] update docs --- doc/source/redis-memory-management.rst | 93 ++------------------------ 1 file changed, 5 insertions(+), 88 deletions(-) diff --git a/doc/source/redis-memory-management.rst b/doc/source/redis-memory-management.rst index 91b207db54e1..5e6edcc02f6c 100644 --- a/doc/source/redis-memory-management.rst +++ b/doc/source/redis-memory-management.rst @@ -7,92 +7,9 @@ servers, as described in `An Overview of the Internals task/object generation rate could risk high memory pressure, potentially leading to out-of-memory (OOM) errors. -Here, we describe an experimental feature that transparently flushes metadata -entries out of Redis memory. +In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object +metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the +previously documented flushing functionality. -Requirements ------------- - -As of early July 2018, the automatic memory management feature requires building -Ray from source. We are planning on eliminating this step in the near future by -releasing official wheels. - -Building Ray -~~~~~~~~~~~~ - -First, follow `instructions to build Ray from source -`__ to install prerequisites. After -the prerequisites are installed, instead of doing the regular ``pip install`` as -referenced in that document, pass an additional special flag, -``RAY_USE_NEW_GCS=on``: - -.. code-block:: bash - - git clone https://github.com/ray-project/ray.git - cd ray/python - RAY_USE_NEW_GCS=on pip install -e . --verbose # Add --user if you see a permission denied error. - -Running Ray applications -~~~~~~~~~~~~~~~~~~~~~~~~ - -At run time the environment variables ``RAY_USE_NEW_GCS=on`` and -``RAY_USE_XRAY=1`` are required. - -.. code-block:: bash - - export RAY_USE_NEW_GCS=on - export RAY_USE_XRAY=1 - python my_ray_script.py # Or launch python/ipython. - -Activate memory flushing ------------------------- - -After building Ray using the method above, simply add these two lines after -``ray.init()`` to activate automatic memory flushing: - -.. code-block:: python - - ray.init(...) - - policy = ray.experimental.SimpleGcsFlushPolicy() - ray.experimental.set_flushing_policy(policy) - - # My awesome Ray application logic follows. - -Paramaters of the flushing policy ---------------------------------- - -There are three `user-configurable parameters -`_ -of the ``SimpleGcsFlushPolicy``: - -* ``flush_when_at_least_bytes``: Wait until this many bytes of memory usage - accumulated in the redis server before flushing kicks in. -* ``flush_period_secs``: Issue a flush to the Redis server every this many - seconds. -* ``flush_num_entries_each_time``: A hint to the system on the number of entries - to flush on each request. - -The default values should serve to be non-invasive for lightweight Ray -applications. ``flush_when_at_least_bytes`` is set to ``(1<<31)`` or 2GB, -``flush_period_secs`` to 10, and ``flush_num_entries_each_time`` to 10000: - -.. code-block:: python - - # Default parameters. - ray.experimental.SimpleGcsFlushPolicy( - flush_when_at_least_bytes=(1 << 31), - flush_period_secs=10, - flush_num_entries_each_time=10000) - -In particular, these default values imply that - -1. the Redis server would accumulate memory usage up to 2GB without any entries -being flushed, then the flushing would kick in; and - -2. generally, "older" metadata entries would be flushed first, and the Redis -server would always keep the most recent window of metadata of 2GB in size. - -**For advanced users.** Advanced users can tune the above parameters to their -applications' needs; note that the desired flush rate is equal to (flush -period) * (num entries each flush). +Note that profiling is disabled when ``redis_max_memory`` is set. This is because +profiling data cannot be LRU evicted. From 4d9cf394faae6a5277ff4f9e868c04b529b91306 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sun, 9 Dec 2018 00:51:11 -0800 Subject: [PATCH 16/16] format --- python/ray/rllib/train.py | 15 ++++++--------- test/failure_test.py | 9 ++++++++- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index dc78921099d3..72d6fc0b58c3 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -38,19 +38,18 @@ def create_parser(parser_creator=None): "--redis-address", default=None, type=str, - help="The Redis address of the cluster.") + help="Connect to an existing Ray cluster at this address instead " + "of starting a new one.") parser.add_argument( "--ray-num-cpus", default=None, type=int, - help="--num-cpus to pass to Ray." - " This only has an affect in local mode.") + help="--num-cpus to use if starting a new cluster.") parser.add_argument( "--ray-num-gpus", default=None, type=int, - help="--num-gpus to pass to Ray." - " This only has an affect in local mode.") + help="--num-gpus to use if starting a new cluster.") parser.add_argument( "--ray-num-local-schedulers", default=None, @@ -60,14 +59,12 @@ def create_parser(parser_creator=None): "--ray-redis-max-memory", default=None, type=int, - help="--redis-max-memory to pass to Ray." - " This only has an affect in local mode.") + help="--redis-max-memory to use if starting a new cluster.") parser.add_argument( "--ray-object-store-memory", default=None, type=int, - help="--object-store-memory to pass to Ray." - " This only has an affect in local mode.") + help="--object-store-memory to use if starting a new cluster.") parser.add_argument( "--experiment-name", default="default", diff --git a/test/failure_test.py b/test/failure_test.py index bb128400a290..2c4a92bd496c 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -3,6 +3,7 @@ from __future__ import print_function import numpy as np +import json import os import ray import sys @@ -570,7 +571,13 @@ class Foo(object): @pytest.fixture def ray_start_two_nodes(): # Start the Ray processes. - ray.worker._init(start_ray_local=True, num_local_schedulers=2, num_cpus=0) + ray.worker._init( + start_ray_local=True, + num_local_schedulers=2, + num_cpus=0, + _internal_config=json.dumps({ + "num_heartbeats_timeout": 40 + })) yield None # The code after the yield will run as teardown code. ray.shutdown()