diff --git a/doc/source/redis-memory-management.rst b/doc/source/redis-memory-management.rst index 5e6edcc02f6c..5ef612fa1727 100644 --- a/doc/source/redis-memory-management.rst +++ b/doc/source/redis-memory-management.rst @@ -8,8 +8,8 @@ task/object generation rate could risk high memory pressure, potentially leading to out-of-memory (OOM) errors. In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object -metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the -previously documented flushing functionality. +metadata by setting ``redis_max_memory_bytes`` when starting Ray. This +supercedes the previously documented flushing functionality. -Note that profiling is disabled when ``redis_max_memory`` is set. This is because -profiling data cannot be LRU evicted. +Note that profiling is disabled when ``redis_max_memory_bytes`` is set. This is +because profiling data cannot be LRU evicted. diff --git a/python/benchmarks/benchmark_wait.py b/python/benchmarks/benchmark_wait.py index 614d76a38c54..767557c653a6 100644 --- a/python/benchmarks/benchmark_wait.py +++ b/python/benchmarks/benchmark_wait.py @@ -33,7 +33,7 @@ def time_wait_many_tasks(self, num_returns): time_wait_many_tasks.param_names = ["num_returns"] def time_wait_timeout(self, timeout): - ray.wait([sleep.remote(0.5)], timeout=timeout) + ray.wait([sleep.remote(0.5)], timeout_milliseconds=timeout) time_wait_timeout.params = [200, 800] time_wait_timeout.param_names = ["timeout_ms"] diff --git a/python/ray/experimental/api.py b/python/ray/experimental/api.py index 9891ecff73eb..015251a62b55 100644 --- a/python/ray/experimental/api.py +++ b/python/ray/experimental/api.py @@ -41,7 +41,7 @@ def get(object_ids, worker=None): return ray.get(object_ids, worker) -def wait(object_ids, num_returns=1, timeout=None, worker=None): +def wait(object_ids, num_returns=1, timeout_milliseconds=None, worker=None): """Return a list of IDs that are ready and a list of IDs that are not. This method is identical to `ray.wait` except it adds support for tuples @@ -52,8 +52,8 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None): List like of object IDs for objects that may or may not be ready. Note that these IDs must be unique. num_returns (int): The number of object IDs that should be returned. - timeout (int): The maximum amount of time in milliseconds to wait - before returning. + timeout_milliseconds (int): The maximum amount of time in milliseconds + to wait before returning. Returns: A list of object IDs that are ready and a list of the remaining object @@ -61,6 +61,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None): """ worker = ray.worker.global_worker if worker is None else worker if isinstance(object_ids, (tuple, np.ndarray)): - return ray.wait(list(object_ids), num_returns, timeout, worker) + return ray.wait( + list(object_ids), num_returns, timeout_milliseconds, worker) - return ray.wait(object_ids, num_returns, timeout, worker) + return ray.wait(object_ids, num_returns, timeout_milliseconds, worker) diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index 2c0f806f2467..9e24bfee8108 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -219,7 +219,7 @@ def as_future(self, object_id, check_ready=True): fut = PlasmaObjectFuture(loop=self._loop, object_id=plain_object_id) if check_ready: - ready, _ = ray.wait([object_id], timeout=0) + ready, _ = ray.wait([object_id], timeout_milliseconds=0) if ready: if self._loop.get_debug(): logger.debug("%s has been ready.", plain_object_id) diff --git a/python/ray/experimental/sgd/test_sgd.py b/python/ray/experimental/sgd/test_sgd.py index 79e00b2656ba..7dc8cffaf1db 100755 --- a/python/ray/experimental/sgd/test_sgd.py +++ b/python/ray/experimental/sgd/test_sgd.py @@ -20,7 +20,7 @@ parser.add_argument("--devices-per-worker", default=2, type=int) parser.add_argument("--stats-interval", default=10, type=int) parser.add_argument("--all-reduce-alg", default="simple", type=str) -parser.add_argument("--object-store-memory", default=None, type=int) +parser.add_argument("--object-store-memory-bytes", default=None, type=int) parser.add_argument( "--warmup", action="store_true", help="Warm up object store before start.") parser.add_argument( @@ -32,7 +32,7 @@ args, _ = parser.parse_known_args() ray.init( redis_address=args.redis_address, - object_store_memory=args.object_store_memory) + object_store_memory_bytes=args.object_store_memory_bytes) model_creator = ( lambda worker_idx, device_idx: TFBenchModel( diff --git a/python/ray/memory_monitor.py b/python/ray/memory_monitor.py index a52f98d7077d..f0974df461aa 100644 --- a/python/ray/memory_monitor.py +++ b/python/ray/memory_monitor.py @@ -36,9 +36,9 @@ def get_message(used_gb, total_gb, threshold): "\n\nIn addition, ~{} GB of shared memory is ".format( round(psutil.virtual_memory().shared / 1e9, 2)) + "currently being used by the Ray object store. You can set " - "the object store size with the `object_store_memory` " + "the object store size with the `object_store_memory_bytes` " "parameter when starting Ray, and the max Redis size with " - "`redis_max_memory`.") + "`redis_max_memory_bytes`.") class MemoryMonitor(object): diff --git a/python/ray/rllib/evaluation/metrics.py b/python/ray/rllib/evaluation/metrics.py index 1b270be3738c..825ab75b801f 100644 --- a/python/ray/rllib/evaluation/metrics.py +++ b/python/ray/rllib/evaluation/metrics.py @@ -32,7 +32,9 @@ def collect_episodes(local_evaluator, for a in remote_evaluators ] collected, _ = ray.wait( - pending, num_returns=len(pending), timeout=timeout_seconds * 1000) + pending, + num_returns=len(pending), + timeout_milliseconds=timeout_seconds * 1000) num_metric_batches_dropped = len(pending) - len(collected) metric_lists = ray.get(collected) diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index d9f7cf58e0b4..09fdc6db3a33 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -56,15 +56,15 @@ def create_parser(parser_creator=None): type=int, help="Emulate multiple cluster nodes for debugging.") parser.add_argument( - "--ray-redis-max-memory", + "--ray-redis-max-memory-bytes", default=None, type=int, - help="--redis-max-memory to use if starting a new cluster.") + help="--redis-max-memory-bytes to use if starting a new cluster.") parser.add_argument( - "--ray-object-store-memory", + "--ray-object-store-memory-bytes", default=None, type=int, - help="--object-store-memory to use if starting a new cluster.") + help="--object-store-memory-bytes to use if starting a new cluster.") parser.add_argument( "--experiment-name", default="default", @@ -125,14 +125,14 @@ def run(args, parser): "num_cpus": args.ray_num_cpus or 1, "num_gpus": args.ray_num_gpus or 0, }, - object_store_memory=args.ray_object_store_memory, - redis_max_memory=args.ray_redis_max_memory) + object_store_memory_bytes=args.ray_object_store_memory_bytes, + redis_max_memory_bytes=args.ray_redis_max_memory_bytes) ray.init(redis_address=cluster.redis_address) else: ray.init( redis_address=args.redis_address, - object_store_memory=args.ray_object_store_memory, - redis_max_memory=args.ray_redis_max_memory, + object_store_memory_bytes=args.ray_object_store_memory_bytes, + redis_max_memory_bytes=args.ray_redis_max_memory_bytes, num_cpus=args.ray_num_cpus, num_gpus=args.ray_num_gpus) run_experiments( diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py index 689aa945cabf..e51346661696 100644 --- a/python/ray/rllib/utils/actors.py +++ b/python/ray/rllib/utils/actors.py @@ -28,7 +28,8 @@ def add(self, worker, all_obj_ids): def completed(self): pending = list(self._tasks) if pending: - ready, _ = ray.wait(pending, num_returns=len(pending), timeout=10) + ready, _ = ray.wait( + pending, num_returns=len(pending), timeout_milliseconds=10) for obj_id in ready: yield (self._tasks.pop(obj_id), self._objects.pop(obj_id)) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index b84db6757c86..01161537bf6d 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -114,12 +114,22 @@ def cli(logging_level, logging_format): "--object-store-memory", required=False, type=int, + help="deprecated, use --object-store-memory-bytes") +@click.option( + "--object-store-memory-bytes", + required=False, + type=int, help="the maximum amount of memory (in bytes) to allow the " "object store to use") @click.option( "--redis-max-memory", required=False, type=int, + help="deprecated, use --redis-max-memory-bytes") +@click.option( + "--redis-max-memory-bytes", + required=False, + type=int, help=("The max amount of memory (in bytes) to allow redis to use, or None " "for no limit. Once the limit is exceeded, redis will start LRU " "eviction of entries. This only applies to the sharded " @@ -130,8 +140,8 @@ def cli(logging_level, logging_format): type=bool, help=("Whether to collect profiling data. Note that " "profiling data cannot be LRU evicted, so if you set " - "redis_max_memory then profiling will also be disabled to prevent " - "it from consuming all available redis memory.")) + "redis_max_memory_bytes then profiling will also be disabled to " + "prevent it from consuming all available redis memory.")) @click.option( "--num-workers", required=False, @@ -218,11 +228,22 @@ def cli(logging_level, logging_format): def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, object_manager_port, node_manager_port, object_store_memory, - redis_max_memory, collect_profiling_data, num_workers, num_cpus, - num_gpus, resources, head, no_ui, block, plasma_directory, - huge_pages, autoscaling_config, no_redirect_worker_output, - no_redirect_output, plasma_store_socket_name, raylet_socket_name, - temp_dir, internal_config): + object_store_memory_bytes, redis_max_memory, redis_max_memory_bytes, + collect_profiling_data, num_workers, num_cpus, num_gpus, resources, + head, no_ui, block, plasma_directory, huge_pages, autoscaling_config, + no_redirect_worker_output, no_redirect_output, + plasma_store_socket_name, raylet_socket_name, temp_dir, + internal_config): + if object_store_memory is not None: + logger.warning("WARNING: The --object-store-memory argument has been " + "deprecated. Please use --object-store-memory-bytes.") + object_store_memory_bytes = object_store_memory + + if redis_max_memory is not None: + logger.warning("WARNING: The --redis-max-memory argument has been " + "deprecated. Please use --redis-max-memory-bytes.") + redis_max_memory_bytes = redis_max_memory + # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -277,8 +298,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, node_ip_address=node_ip_address, redis_port=redis_port, redis_shard_ports=redis_shard_ports, - object_store_memory=object_store_memory, - redis_max_memory=redis_max_memory, + object_store_memory_bytes=object_store_memory_bytes, + redis_max_memory_bytes=redis_max_memory_bytes, collect_profiling_data=collect_profiling_data, num_workers=num_workers, cleanup=False, @@ -364,7 +385,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, object_manager_ports=[object_manager_port], node_manager_ports=[node_manager_port], num_workers=num_workers, - object_store_memory=object_store_memory, + object_store_memory_bytes=object_store_memory_bytes, redis_password=redis_password, cleanup=False, redirect_worker_output=not no_redirect_worker_output, diff --git a/python/ray/services.py b/python/ray/services.py index 77138715de57..ecd900c0adb6 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -417,7 +417,7 @@ def start_redis(node_ip_address, cleanup=True, password=None, use_credis=None, - redis_max_memory=None): + redis_max_memory_bytes=None): """Start the Redis global state store. Args: @@ -446,9 +446,9 @@ def start_redis(node_ip_address, use_credis: If True, additionally load the chain-replicated libraries into the redis servers. Defaults to None, which means its value is set by the presence of "RAY_USE_NEW_GCS" in os.environ. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the + redis_max_memory_bytes: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). Returns: @@ -481,7 +481,7 @@ def start_redis(node_ip_address, stderr_file=redis_stderr_file, cleanup=cleanup, password=password, - redis_max_memory=None) + redis_max_memory_bytes=None) else: assigned_port, _ = _start_redis_instance( node_ip_address=node_ip_address, @@ -496,7 +496,7 @@ def start_redis(node_ip_address, # supplies. modules=[CREDIS_MASTER_MODULE, REDIS_MODULE], password=password, - redis_max_memory=None) + redis_max_memory_bytes=None) if port is not None: assert assigned_port == port port = assigned_port @@ -531,7 +531,7 @@ def start_redis(node_ip_address, stderr_file=redis_stderr_file, cleanup=cleanup, password=password, - redis_max_memory=redis_max_memory) + redis_max_memory_bytes=redis_max_memory_bytes) else: assert num_redis_shards == 1, \ "For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\ @@ -549,7 +549,7 @@ def start_redis(node_ip_address, # module, as the latter contains an extern declaration that the # former supplies. modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE], - redis_max_memory=redis_max_memory) + redis_max_memory_bytes=redis_max_memory_bytes) if redis_shard_ports[i] is not None: assert redis_shard_port == redis_shard_ports[i] @@ -580,7 +580,7 @@ def _start_redis_instance(node_ip_address="127.0.0.1", password=None, executable=REDIS_EXECUTABLE, modules=None, - redis_max_memory=None): + redis_max_memory_bytes=None): """Start a single Redis server. Args: @@ -604,9 +604,9 @@ def _start_redis_instance(node_ip_address="127.0.0.1", modules (list of str): A list of pathnames, pointing to the redis module(s) that will be loaded in this redis server. If None, load the default Ray redis module. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. + redis_max_memory_bytes: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. Returns: A tuple of the port used by Redis and a handle to the process that was @@ -671,12 +671,12 @@ def _start_redis_instance(node_ip_address="127.0.0.1", redis_client.config_set("protected-mode", "no") # Discard old task and object metadata. - if redis_max_memory is not None: - redis_client.config_set("maxmemory", str(redis_max_memory)) + if redis_max_memory_bytes is not None: + redis_client.config_set("maxmemory", str(redis_max_memory_bytes)) redis_client.config_set("maxmemory-policy", "allkeys-lru") redis_client.config_set("maxmemory-samples", "10") logger.info("Starting Redis shard with {} GB max memory.".format( - round(redis_max_memory / 1e9, 2))) + round(redis_max_memory_bytes / 1e9, 2))) # If redis_max_clients is provided, attempt to raise the number of maximum # number of Redis clients. @@ -1016,7 +1016,7 @@ def start_raylet(redis_address, return raylet_name -def determine_plasma_store_config(object_store_memory=None, +def determine_plasma_store_config(object_store_memory_bytes=None, plasma_directory=None, huge_pages=False): """Figure out how to configure the plasma object store. @@ -1029,7 +1029,7 @@ def determine_plasma_store_config(object_store_memory=None, values will be preserved. Args: - object_store_memory (int): The user-specified object store memory + object_store_memory_bytes (int): The user-specified object store memory parameter. plasma_directory (str): The user-specified plasma directory parameter. huge_pages (bool): The user-specified huge pages parameter. @@ -1042,16 +1042,17 @@ def determine_plasma_store_config(object_store_memory=None, system_memory = ray.utils.get_system_memory() # Choose a default object store size. - if object_store_memory is None: - object_store_memory = int(system_memory * 0.4) + if object_store_memory_bytes is None: + object_store_memory_bytes = int(system_memory * 0.4) # Cap memory to avoid memory waste and perf issues on large nodes - if object_store_memory > MAX_DEFAULT_MEM: + if object_store_memory_bytes > MAX_DEFAULT_MEM: logger.warning( "Warning: Capping object memory store to {}GB. ".format( MAX_DEFAULT_MEM // 1e9) + - "To increase this further, specify `object_store_memory` " - "when calling ray.init() or ray start.") - object_store_memory = MAX_DEFAULT_MEM + "To increase this further, specify " + "`object_store_memory_bytes` when calling ray.init() or " + "ray start.") + object_store_memory_bytes = MAX_DEFAULT_MEM # Determine which directory to use. By default, use /tmp on MacOS and # /dev/shm on Linux, unless the shared-memory file system is too small, @@ -1061,7 +1062,7 @@ def determine_plasma_store_config(object_store_memory=None, shm_avail = ray.utils.get_shared_memory_bytes() # Compare the requested memory size to the memory available in # /dev/shm. - if shm_avail > object_store_memory: + if shm_avail > object_store_memory_bytes: plasma_directory = "/dev/shm" else: plasma_directory = "/tmp" @@ -1078,20 +1079,21 @@ def determine_plasma_store_config(object_store_memory=None, plasma_directory = "/tmp" # Do some sanity checks. - if object_store_memory > system_memory: + if object_store_memory_bytes > system_memory: raise Exception( "The requested object store memory size is greater " "than the total available memory.") else: plasma_directory = os.path.abspath(plasma_directory) - logger.warning("WARNING: object_store_memory is not verified when " - "plasma_directory is set.") + logger.warning( + "WARNING: object_store_memory_bytes is not verified when " + "plasma_directory is set.") if not os.path.isdir(plasma_directory): raise Exception("The file {} does not exist or is not a directory." .format(plasma_directory)) - return object_store_memory, plasma_directory + return object_store_memory_bytes, plasma_directory def start_plasma_store(node_ip_address, @@ -1099,7 +1101,7 @@ def start_plasma_store(node_ip_address, object_manager_port=None, store_stdout_file=None, store_stderr_file=None, - object_store_memory=None, + object_store_memory_bytes=None, cleanup=True, plasma_directory=None, huge_pages=False, @@ -1117,7 +1119,7 @@ def start_plasma_store(node_ip_address, to. If no redirection should happen, then this should be None. store_stderr_file: A file handle opened for writing to redirect stderr to. If no redirection should happen, then this should be None. - object_store_memory: The amount of memory (in bytes) to start the + object_store_memory_bytes: The amount of memory (in bytes) to start the object store with. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the @@ -1131,16 +1133,18 @@ def start_plasma_store(node_ip_address, Return: The Plasma store socket name. """ - object_store_memory, plasma_directory = determine_plasma_store_config( - object_store_memory, plasma_directory, huge_pages) + (object_store_memory_bytes, + plasma_directory) = determine_plasma_store_config( + object_store_memory_bytes, plasma_directory, huge_pages) # Print the object store memory using two decimal places. - object_store_memory_str = (object_store_memory / 10**7) / 10**2 + object_store_memory_bytes_str = (object_store_memory_bytes / 10**7) / 10**2 logger.info("Starting the Plasma object store with {} GB memory " - "using {}.".format(object_store_memory_str, plasma_directory)) + "using {}.".format(object_store_memory_bytes_str, + plasma_directory)) # Start the Plasma store. plasma_store_name, p1 = ray.plasma.start_plasma_store( - plasma_store_memory=object_store_memory, + plasma_store_memory=object_store_memory_bytes, use_profiler=RUN_PLASMA_STORE_PROFILER, stdout_file=store_stdout_file, stderr_file=store_stderr_file, @@ -1284,8 +1288,8 @@ def start_ray_processes(address_info=None, redis_shard_ports=None, num_workers=None, num_local_schedulers=1, - object_store_memory=None, - redis_max_memory=None, + object_store_memory_bytes=None, + redis_max_memory_bytes=None, collect_profiling_data=True, num_redis_shards=1, redis_max_clients=None, @@ -1331,16 +1335,16 @@ def start_ray_processes(address_info=None, stores until there are num_local_schedulers existing instances of each, including ones already registered with the given address_info. - object_store_memory: The amount of memory (in bytes) to start the + object_store_memory_bytes: The amount of memory (in bytes) to start the object store with. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the + redis_max_memory_bytes: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). collect_profiling_data: Whether to collect profiling data. Note that profiling data cannot be LRU evicted, so if you set - redis_max_memory then profiling will also be disabled to prevent - it from consuming all available redis memory. + redis_max_memory_bytes then profiling will also be disabled to + prevent it from consuming all available redis memory. num_redis_shards: The number of Redis shards to start in addition to the primary Redis shard. redis_max_clients: If provided, attempt to configure Redis with this @@ -1433,7 +1437,7 @@ def start_ray_processes(address_info=None, redirect_worker_output=redirect_worker_output, cleanup=cleanup, password=redis_password, - redis_max_memory=redis_max_memory) + redis_max_memory_bytes=redis_max_memory_bytes) address_info["redis_address"] = redis_address time.sleep(0.1) @@ -1505,7 +1509,7 @@ def start_ray_processes(address_info=None, redis_address, store_stdout_file=plasma_store_stdout_file, store_stderr_file=plasma_store_stderr_file, - object_store_memory=object_store_memory, + object_store_memory_bytes=object_store_memory_bytes, cleanup=cleanup, plasma_directory=plasma_directory, huge_pages=huge_pages, @@ -1556,7 +1560,7 @@ def start_ray_node(node_ip_address, node_manager_ports=None, num_workers=None, num_local_schedulers=1, - object_store_memory=None, + object_store_memory_bytes=None, redis_password=None, worker_path=None, cleanup=True, @@ -1586,8 +1590,8 @@ def start_ray_node(node_ip_address, num_workers (int): The number of workers to start. num_local_schedulers (int): The number of local schedulers to start. This is also the number of plasma stores and raylets to start. - object_store_memory (int): The maximum amount of memory (in bytes) to - let the plasma store use. + object_store_memory_bytes (int): The maximum amount of memory (in + bytes) to let the plasma store use. redis_password (str): Prevents external clients without the password from connecting to Redis if provided. worker_path (str): The path of the source code that will be run by the @@ -1628,7 +1632,7 @@ def start_ray_node(node_ip_address, node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, - object_store_memory=object_store_memory, + object_store_memory_bytes=object_store_memory_bytes, redis_password=redis_password, worker_path=worker_path, include_log_monitor=True, @@ -1652,8 +1656,8 @@ def start_ray_head(address_info=None, redis_shard_ports=None, num_workers=None, num_local_schedulers=1, - object_store_memory=None, - redis_max_memory=None, + object_store_memory_bytes=None, + redis_max_memory_bytes=None, collect_profiling_data=True, worker_path=None, cleanup=True, @@ -1698,11 +1702,11 @@ def start_ray_head(address_info=None, stores until there are at least num_local_schedulers existing instances of each, including ones already registered with the given address_info. - object_store_memory: The amount of memory (in bytes) to start the + object_store_memory_bytes: The amount of memory (in bytes) to start the object store with. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the + redis_max_memory_bytes: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). collect_profiling_data: Whether to collect profiling data from workers. worker_path (str): The path of the source code that will be run by the @@ -1754,8 +1758,8 @@ def start_ray_head(address_info=None, redis_shard_ports=redis_shard_ports, num_workers=num_workers, num_local_schedulers=num_local_schedulers, - object_store_memory=object_store_memory, - redis_max_memory=redis_max_memory, + object_store_memory_bytes=object_store_memory_bytes, + redis_max_memory_bytes=redis_max_memory_bytes, collect_profiling_data=collect_profiling_data, worker_path=worker_path, cleanup=cleanup, diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index aff302efc434..74e01449de7e 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -61,7 +61,7 @@ def add_node(self, **override_kwargs): All nodes are by default started with the following settings: cleanup=True, resources={"CPU": 1}, - object_store_memory=100 * (2**20) # 100 MB + object_store_memory_bytes=100 * (2**20) # 100 MB Args: override_kwargs: Keyword arguments used in `start_ray_head` @@ -75,7 +75,7 @@ def add_node(self, **override_kwargs): "resources": { "CPU": 1 }, - "object_store_memory": 100 * (2**20) # 100 MB + "object_store_memory_bytes": 100 * (2**20) # 100 MB } node_kwargs.update(override_kwargs) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index 84457ff8d9e9..cb046b6d4d61 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -230,7 +230,7 @@ def _memory_debug_string(self): "unexpected crashes. Consider " "reducing the memory used by your application " "or reducing the Ray object store size by setting " - "`object_store_memory` when calling `ray.init`.") + "`object_store_memory_bytes` when calling `ray.init`.") else: warn = "" return "Memory usage on this node: {}/{} GB{}".format( diff --git a/python/ray/worker.py b/python/ray/worker.py index 9cdf3bbc9cd8..f9468951435f 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1274,8 +1274,8 @@ def _init(address_info=None, object_id_seed=None, num_workers=None, num_local_schedulers=None, - object_store_memory=None, - redis_max_memory=None, + object_store_memory_bytes=None, + redis_max_memory_bytes=None, collect_profiling_data=True, local_mode=False, driver_mode=None, @@ -1319,11 +1319,11 @@ def _init(address_info=None, manner. However, the same ID should not be used for different jobs. num_local_schedulers (int): The number of local schedulers to start. This is only provided if start_ray_local is True. - object_store_memory: The maximum amount of memory (in bytes) to + object_store_memory_bytes: The maximum amount of memory (in bytes) to allow the object store to use. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the + redis_max_memory_bytes: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). collect_profiling_data: Whether to collect profiling data from workers. local_mode (bool): True if the code should be executed serially @@ -1380,10 +1380,10 @@ def _init(address_info=None, else: driver_mode = SCRIPT_MODE - if redis_max_memory and collect_profiling_data: + if redis_max_memory_bytes and collect_profiling_data: logger.warning( "Profiling data cannot be LRU evicted, so it is disabled " - "when redis_max_memory is set.") + "when redis_max_memory_bytes is set.") collect_profiling_data = False # Get addresses of existing services. @@ -1424,8 +1424,8 @@ def _init(address_info=None, node_ip_address=node_ip_address, num_workers=num_workers, num_local_schedulers=num_local_schedulers, - object_store_memory=object_store_memory, - redis_max_memory=redis_max_memory, + object_store_memory_bytes=object_store_memory_bytes, + redis_max_memory_bytes=redis_max_memory_bytes, collect_profiling_data=collect_profiling_data, redirect_worker_output=redirect_worker_output, redirect_output=redirect_output, @@ -1464,12 +1464,12 @@ def _init(address_info=None, if redis_max_clients is not None: raise Exception("When connecting to an existing cluster, " "redis_max_clients must not be provided.") - if object_store_memory is not None: + if object_store_memory_bytes is not None: raise Exception("When connecting to an existing cluster, " - "object_store_memory must not be provided.") - if redis_max_memory is not None: + "object_store_memory_bytes must not be provided.") + if redis_max_memory_bytes is not None: raise Exception("When connecting to an existing cluster, " - "redis_max_memory must not be provided.") + "redis_max_memory_bytes must not be provided.") if plasma_directory is not None: raise Exception("When connecting to an existing cluster, " "plasma_directory must not be provided.") @@ -1529,8 +1529,8 @@ def init(redis_address=None, num_cpus=None, num_gpus=None, resources=None, - object_store_memory=None, - redis_max_memory=None, + object_store_memory_bytes=None, + redis_max_memory_bytes=None, collect_profiling_data=True, node_ip_address=None, object_id_seed=None, @@ -1586,11 +1586,11 @@ def init(redis_address=None, be configured with. resources: A dictionary mapping the name of a resource to the quantity of that resource available. - object_store_memory: The amount of memory (in bytes) to start the + object_store_memory_bytes: The amount of memory (in bytes) to start the object store with. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the + redis_max_memory_bytes: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). collect_profiling_data: Whether to collect profiling data from workers. node_ip_address (str): The IP address of the node that we are on. @@ -1692,8 +1692,8 @@ def init(redis_address=None, plasma_directory=plasma_directory, huge_pages=huge_pages, include_webui=include_webui, - object_store_memory=object_store_memory, - redis_max_memory=redis_max_memory, + object_store_memory_bytes=object_store_memory_bytes, + redis_max_memory_bytes=redis_max_memory_bytes, collect_profiling_data=collect_profiling_data, driver_id=driver_id, plasma_store_socket_name=plasma_store_socket_name, @@ -2408,7 +2408,11 @@ def put(value, worker=global_worker): return object_id -def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): +def wait(object_ids, + num_returns=1, + timeout_milliseconds=None, + timeout=None, + worker=global_worker): """Return a list of IDs that are ready and a list of IDs that are not. If timeout is set, the function returns either when the requested number of @@ -2430,13 +2434,17 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): object_ids (List[ObjectID]): List of object IDs for objects that may or may not be ready. Note that these IDs must be unique. num_returns (int): The number of object IDs that should be returned. - timeout (int): The maximum amount of time in milliseconds to wait - before returning. + timeout_milliseconds (int): The maximum amount of time in milliseconds + to wait before returning. Returns: A list of object IDs that are ready and a list of the remaining object IDs. """ + if timeout is not None: + logger.warning("WARNING: The timeout argument has been deprecated. " + "Please use timeout_milliseconds.") + timeout_milliseconds = timeout if isinstance(object_ids, ray.ObjectID): raise TypeError( @@ -2480,9 +2488,11 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): with worker.state_lock: current_task_id = worker.get_current_thread_task_id() - timeout = timeout if timeout is not None else 2**30 + timeout_milliseconds = (timeout_milliseconds + if timeout_milliseconds is not None else 2**30) ready_ids, remaining_ids = worker.raylet_client.wait( - object_ids, num_returns, timeout, False, current_task_id) + object_ids, num_returns, timeout_milliseconds, False, + current_task_id) return ready_ids, remaining_ids diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 958fecef506a..da1de4e37cdb 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1676,7 +1676,7 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { "evicted " << "by the redis LRU configuration. Consider increasing the memory " "allocation via " - << "ray.init(redis_max_memory=)."; + << "ray.init(redis_max_memory_bytes=)."; // Use a copy of the cached task spec to re-execute the task. const Task task = lineage_cache_.GetTask(task_id); ResubmitTask(task); diff --git a/test/actor_test.py b/test/actor_test.py index 9a42842d68ed..06eb8c83d1f7 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -735,7 +735,7 @@ def method(self): pass f = Foo.remote() - ready_ids, _ = ray.wait([f.method.remote()], timeout=100) + ready_ids, _ = ray.wait([f.method.remote()], timeout_milliseconds=100) assert ready_ids == [] @@ -822,7 +822,8 @@ def get_location_and_ids(self): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_milliseconds=10) assert ready_ids == [] @@ -863,7 +864,8 @@ def get_location_and_ids(self): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_milliseconds=10) assert ready_ids == [] # We should be able to create more actors that use only a single GPU. @@ -892,7 +894,8 @@ def get_location_and_ids(self): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor2.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_milliseconds=10) assert ready_ids == [] @@ -932,7 +935,8 @@ def get_location_and_ids(self): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_milliseconds=10) assert ready_ids == [] @@ -1010,7 +1014,8 @@ def get_location_and_ids(self): # All the GPUs should be used up now. a = Actor.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_milliseconds=10) assert ready_ids == [] @@ -1154,7 +1159,7 @@ def locations_to_intervals_for_many_tasks(): # Now if we run some GPU tasks, they should not be scheduled. results = [f1.remote() for _ in range(30)] - ready_ids, remaining_ids = ray.wait(results, timeout=1000) + ready_ids, remaining_ids = ray.wait(results, timeout_milliseconds=1000) assert len(ready_ids) == 0 @@ -1263,7 +1268,7 @@ def blocking_method(self): # block. actor = CPUFoo.remote() x_id = actor.blocking_method.remote() - ready_ids, remaining_ids = ray.wait([x_id], timeout=1000) + ready_ids, remaining_ids = ray.wait([x_id], timeout_milliseconds=1000) assert ready_ids == [] assert remaining_ids == [x_id] @@ -1278,7 +1283,7 @@ def blocking_method(self): # Make sure that GPU resources are not released when actors block. actor = GPUFoo.remote() x_id = actor.blocking_method.remote() - ready_ids, remaining_ids = ray.wait([x_id], timeout=1000) + ready_ids, remaining_ids = ray.wait([x_id], timeout_milliseconds=1000) assert ready_ids == [] assert remaining_ids == [x_id] @@ -2013,7 +2018,7 @@ def method(self): actor2s = [Actor2.remote() for _ in range(2)] results = [a.method.remote() for a in actor2s] ready_ids, remaining_ids = ray.wait( - results, num_returns=len(results), timeout=1000) + results, num_returns=len(results), timeout_milliseconds=1000) assert len(ready_ids) == 1 @@ -2074,7 +2079,7 @@ def method(self): ray.wait([result2]) actor3 = ResourceActor1.remote() result3 = actor3.method.remote() - ready_ids, _ = ray.wait([result3], timeout=200) + ready_ids, _ = ray.wait([result3], timeout_milliseconds=200) assert len(ready_ids) == 0 # By deleting actor1, we free up resources to create actor3. @@ -2108,9 +2113,9 @@ def __init__(self): def create_object(self, size): return np.random.rand(size) - object_store_memory = 10**8 + object_store_memory_bytes = 10**8 ray.init( - object_store_memory=object_store_memory, + object_store_memory_bytes=object_store_memory_bytes, _internal_config=json.dumps({ "initial_reconstruction_timeout_milliseconds": 200 })) @@ -2121,7 +2126,7 @@ def create_object(self, size): objects = [] num_objects = 20 for _ in range(num_objects): - obj = a.create_object.remote(object_store_memory // num_objects) + obj = a.create_object.remote(object_store_memory_bytes // num_objects) objects.append(obj) # Get each object once to make sure each object gets created. ray.get(obj) diff --git a/test/component_failures_test.py b/test/component_failures_test.py index 30071b3c1917..eda03d994050 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -87,7 +87,7 @@ def f(id_in_a_list): time.sleep(1) # Make sure the task hasn't finished. - ready_ids, _ = ray.wait([result_id], timeout=0) + ready_ids, _ = ray.wait([result_id], timeout_milliseconds=0) assert len(ready_ids) == 0 # Kill the worker. @@ -95,7 +95,7 @@ def f(id_in_a_list): time.sleep(0.1) # Make sure the sleep task hasn't finished. - ready_ids, _ = ray.wait([x_id], timeout=0) + ready_ids, _ = ray.wait([x_id], timeout_milliseconds=0) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # get has been fulfilled. @@ -138,7 +138,7 @@ def sleep_forever(): time.sleep(0.1) # Make sure the original task hasn't finished. - ready_ids, _ = ray.wait([x_id], timeout=0) + ready_ids, _ = ray.wait([x_id], timeout_milliseconds=0) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # get has been fulfilled. @@ -223,7 +223,7 @@ def sleep_forever(): time.sleep(0.1) # Make sure the original task hasn't finished. - ready_ids, _ = ray.wait([x_id], timeout=0) + ready_ids, _ = ray.wait([x_id], timeout_milliseconds=0) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # wait can return. @@ -401,7 +401,7 @@ def ping(self): ready, _ = ray.wait( children_out, num_returns=len(children_out), - timeout=5 * 60 * 1000) + timeout_milliseconds=5 * 60 * 1000) assert len(ready) == len(children_out) # Replace any actors that died. diff --git a/test/failure_test.py b/test/failure_test.py index 3efb9bc69a7a..f65bd2fdfa0a 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -329,7 +329,7 @@ def consume(x): pass a = Actor.remote() - [obj], _ = ray.wait([a.kill.remote()], timeout=5000) + [obj], _ = ray.wait([a.kill.remote()], timeout_milliseconds=5000) with pytest.raises(Exception): ray.get(obj) with pytest.raises(Exception): @@ -388,17 +388,17 @@ class Actor(object): @pytest.fixture -def ray_start_object_store_memory(): +def ray_start_object_store_memory_bytes(): # Start the Ray processes. store_size = 10**6 - ray.init(num_cpus=1, object_store_memory=store_size) + ray.init(num_cpus=1, object_store_memory_bytes=store_size) yield None # The code after the yield will run as teardown code. ray.shutdown() @pytest.mark.skip("This test does not work yet.") -def test_put_error1(ray_start_object_store_memory): +def test_put_error1(ray_start_object_store_memory_bytes): num_objects = 3 object_size = 4 * 10**5 @@ -440,7 +440,7 @@ def put_arg_task(): @pytest.mark.skip("This test does not work yet.") -def test_put_error2(ray_start_object_store_memory): +def test_put_error2(ray_start_object_store_memory_bytes): # This is the same as the previous test, but it calls ray.put directly. num_objects = 3 object_size = 4 * 10**5 diff --git a/test/object_manager_test.py b/test/object_manager_test.py index 928d0dcd8a09..956c1657f13f 100644 --- a/test/object_manager_test.py +++ b/test/object_manager_test.py @@ -21,7 +21,8 @@ def create_cluster(num_nodes): cluster = Cluster() for i in range(num_nodes): - cluster.add_node(resources={str(i): 100}, object_store_memory=10**9) + cluster.add_node( + resources={str(i): 100}, object_store_memory_bytes=10**9) ray.init(redis_address=cluster.redis_address) return cluster diff --git a/test/runtest.py b/test/runtest.py index 91862023bb5d..e07ebbaa2442 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -771,7 +771,8 @@ def g(): args=[], num_cpus=1, num_gpus=1, resources={"Custom": 1})) == [0] infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1}) - ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=50) + ready_ids, remaining_ids = ray.wait( + [infeasible_id], timeout_milliseconds=50) assert len(ready_ids) == 0 assert len(remaining_ids) == 1 @@ -846,14 +847,15 @@ def f(delay): objectids = [f.remote(0.5), f.remote(0.5), f.remote(0.5), f.remote(0.5)] start_time = time.time() - ready_ids, remaining_ids = ray.wait(objectids, timeout=1750, num_returns=4) + ready_ids, remaining_ids = ray.wait( + objectids, timeout_milliseconds=1750, num_returns=4) assert time.time() - start_time < 2 assert len(ready_ids) == 3 assert len(remaining_ids) == 1 ray.wait(objectids) objectids = [f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5)] start_time = time.time() - ready_ids, remaining_ids = ray.wait(objectids, timeout=5000) + ready_ids, remaining_ids = ray.wait(objectids, timeout_milliseconds=5000) assert time.time() - start_time < 5 assert len(ready_ids) == 1 assert len(remaining_ids) == 3 @@ -1074,7 +1076,8 @@ def test_object_transfer_dump(ray_start_cluster): num_nodes = 3 for i in range(num_nodes): - cluster.add_node(resources={str(i): 1}, object_store_memory=10**9) + cluster.add_node( + resources={str(i): 1}, object_store_memory_bytes=10**9) ray.init(redis_address=cluster.redis_address) @@ -1326,13 +1329,13 @@ def run_one_test(actors, local_only): ] # Case 1: run this local_only=False. All 3 objects will be deleted. (a, b, c) = run_one_test(actors, False) - (l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=1) + (l1, l2) = ray.wait([a, b, c], timeout_milliseconds=10, num_returns=1) # All the objects are deleted. assert len(l1) == 0 assert len(l2) == 3 # Case 2: run this local_only=True. Only 1 object will be deleted. (a, b, c) = run_one_test(actors, True) - (l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=3) + (l1, l2) = ray.wait([a, b, c], timeout_milliseconds=10, num_returns=3) # One object is deleted and 2 objects are not. assert len(l1) == 2 assert len(l2) == 1 @@ -1381,7 +1384,7 @@ def f(): num_returns = 5 object_ids = [ray.put(i) for i in range(20)] ready, remaining = ray.wait( - object_ids, num_returns=num_returns, timeout=None) + object_ids, num_returns=num_returns, timeout_milliseconds=None) assert_equal(ready, object_ids[:num_returns]) assert_equal(remaining, object_ids[num_returns:]) @@ -1762,7 +1765,7 @@ def method(self): # custom resource. TODO(rkn): Re-enable this once ray.wait is # implemented. f2 = Foo2._remote([], {}, resources={"Custom": 0.7}) - ready, _ = ray.wait([f2.method.remote()], timeout=500) + ready, _ = ray.wait([f2.method.remote()], timeout_milliseconds=500) assert len(ready) == 0 # Make sure we can start an actor that requries only 0.3 of the custom # resource. @@ -1988,7 +1991,8 @@ def k(): # Make sure that tasks with unsatisfied custom resource requirements do # not get scheduled. - ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], timeout=500) + ready_ids, remaining_ids = ray.wait( + [j.remote(), k.remote()], timeout_milliseconds=500) assert ready_ids == [] @@ -2407,7 +2411,7 @@ def test_initialized_local_mode(shutdown_only_with_initialization_check): def test_wait_reconstruction(shutdown_only): - ray.init(num_cpus=1, object_store_memory=10**8) + ray.init(num_cpus=1, object_store_memory_bytes=10**8) @ray.remote def f(): diff --git a/test/stress_tests.py b/test/stress_tests.py index 3771f58053a9..ddf79c47d145 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -194,7 +194,7 @@ def ray_start_reconstruction(request): # Start the Plasma store instances with a total of 1GB memory. plasma_store_memory = 10**9 plasma_addresses = [] - object_store_memory = plasma_store_memory // num_local_schedulers + object_store_memory_bytes = plasma_store_memory // num_local_schedulers for i in range(num_local_schedulers): store_stdout_file, store_stderr_file = ( ray.tempfile_services.new_plasma_store_log_file(i, True)) @@ -202,7 +202,7 @@ def ray_start_reconstruction(request): ray.services.start_plasma_store( node_ip_address, redis_address, - object_store_memory=object_store_memory, + object_store_memory_bytes=object_store_memory_bytes, store_stdout_file=store_stdout_file, store_stderr_file=store_stderr_file)) @@ -497,7 +497,7 @@ def error_check(errors): def ray_start_driver_put_errors(): plasma_store_memory = 10**9 # Start the Ray processes. - ray.init(num_cpus=1, object_store_memory=plasma_store_memory) + ray.init(num_cpus=1, object_store_memory_bytes=plasma_store_memory) yield plasma_store_memory # The code after the yield will run as teardown code. ray.shutdown()