diff --git a/doc/source/redis-memory-management.rst b/doc/source/redis-memory-management.rst index 5e6edcc02f6c..cd3fea02c26c 100644 --- a/doc/source/redis-memory-management.rst +++ b/doc/source/redis-memory-management.rst @@ -8,8 +8,8 @@ task/object generation rate could risk high memory pressure, potentially leading to out-of-memory (OOM) errors. In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object -metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the -previously documented flushing functionality. +metadata by setting ``redis_max_memory_mb`` when starting Ray. This +supercedes the previously documented flushing functionality. -Note that profiling is disabled when ``redis_max_memory`` is set. This is because -profiling data cannot be LRU evicted. +Note that profiling is disabled when ``redis_max_memory_mb`` is set. This is +because profiling data cannot be LRU evicted. diff --git a/python/ray/experimental/api.py b/python/ray/experimental/api.py index 9891ecff73eb..6cbda987aea0 100644 --- a/python/ray/experimental/api.py +++ b/python/ray/experimental/api.py @@ -41,7 +41,7 @@ def get(object_ids, worker=None): return ray.get(object_ids, worker) -def wait(object_ids, num_returns=1, timeout=None, worker=None): +def wait(object_ids, num_returns=1, timeout_seconds=None, worker=None): """Return a list of IDs that are ready and a list of IDs that are not. This method is identical to `ray.wait` except it adds support for tuples @@ -52,7 +52,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None): List like of object IDs for objects that may or may not be ready. Note that these IDs must be unique. num_returns (int): The number of object IDs that should be returned. - timeout (int): The maximum amount of time in milliseconds to wait + timeout_seconds (int): The maximum amount of time in seconds to wait before returning. Returns: @@ -61,6 +61,14 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None): """ worker = ray.worker.global_worker if worker is None else worker if isinstance(object_ids, (tuple, np.ndarray)): - return ray.wait(list(object_ids), num_returns, timeout, worker) + return ray.wait( + list(object_ids), + num_returns=num_returns, + timeout_seconds=timeout_seconds, + worker=worker) - return ray.wait(object_ids, num_returns, timeout, worker) + return ray.wait( + object_ids, + num_returns=num_returns, + timeout_seconds=timeout_seconds, + worker=worker) diff --git a/python/ray/experimental/async_plasma.py b/python/ray/experimental/async_plasma.py index 2c0f806f2467..d14c000ef21f 100644 --- a/python/ray/experimental/async_plasma.py +++ b/python/ray/experimental/async_plasma.py @@ -219,7 +219,7 @@ def as_future(self, object_id, check_ready=True): fut = PlasmaObjectFuture(loop=self._loop, object_id=plain_object_id) if check_ready: - ready, _ = ray.wait([object_id], timeout=0) + ready, _ = ray.wait([object_id], timeout_seconds=0) if ready: if self._loop.get_debug(): logger.debug("%s has been ready.", plain_object_id) diff --git a/python/ray/experimental/sgd/test_sgd.py b/python/ray/experimental/sgd/test_sgd.py index 79e00b2656ba..a55cf1670edb 100755 --- a/python/ray/experimental/sgd/test_sgd.py +++ b/python/ray/experimental/sgd/test_sgd.py @@ -20,7 +20,7 @@ parser.add_argument("--devices-per-worker", default=2, type=int) parser.add_argument("--stats-interval", default=10, type=int) parser.add_argument("--all-reduce-alg", default="simple", type=str) -parser.add_argument("--object-store-memory", default=None, type=int) +parser.add_argument("--object-store-memory-mb", default=None, type=int) parser.add_argument( "--warmup", action="store_true", help="Warm up object store before start.") parser.add_argument( @@ -32,7 +32,7 @@ args, _ = parser.parse_known_args() ray.init( redis_address=args.redis_address, - object_store_memory=args.object_store_memory) + object_store_memory_mb=args.object_store_memory_mb) model_creator = ( lambda worker_idx, device_idx: TFBenchModel( diff --git a/python/ray/memory_monitor.py b/python/ray/memory_monitor.py index a52f98d7077d..2457c4c36391 100644 --- a/python/ray/memory_monitor.py +++ b/python/ray/memory_monitor.py @@ -36,9 +36,9 @@ def get_message(used_gb, total_gb, threshold): "\n\nIn addition, ~{} GB of shared memory is ".format( round(psutil.virtual_memory().shared / 1e9, 2)) + "currently being used by the Ray object store. You can set " - "the object store size with the `object_store_memory` " + "the object store size with the `object_store_memory_mb` " "parameter when starting Ray, and the max Redis size with " - "`redis_max_memory`.") + "`redis_max_memory_mb`.") class MemoryMonitor(object): diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 350b118e0bc4..dcba6f4138b5 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -40,11 +40,11 @@ class RayParams(object): This is only provided if start_ray_local is True. resources: A dictionary mapping the name of a resource to the quantity of that resource available. - object_store_memory: The amount of memory (in bytes) to start the - object store with. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the + object_store_memory_mb: The amount of memory (in megabytes) to start + the object store with. + redis_max_memory_mb: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). object_manager_ports (list): A list of the ports to use for the object managers. There should be one per object manager being started on @@ -100,8 +100,8 @@ def __init__(self, num_gpus=None, num_local_schedulers=None, resources=None, - object_store_memory=None, - redis_max_memory=None, + object_store_memory_mb=None, + redis_max_memory_mb=None, redis_port=None, redis_shard_ports=None, object_manager_ports=None, @@ -137,8 +137,8 @@ def __init__(self, self.num_gpus = num_gpus self.num_local_schedulers = num_local_schedulers self.resources = resources - self.object_store_memory = object_store_memory - self.redis_max_memory = redis_max_memory + self.object_store_memory_mb = object_store_memory_mb + self.redis_max_memory_mb = redis_max_memory_mb self.redis_port = redis_port self.redis_shard_ports = redis_shard_ports self.object_manager_ports = object_manager_ports diff --git a/python/ray/plasma/plasma.py b/python/ray/plasma/plasma.py index 53b2434260c8..d7eb2c60db01 100644 --- a/python/ray/plasma/plasma.py +++ b/python/ray/plasma/plasma.py @@ -11,8 +11,6 @@ __all__ = ["start_plasma_store", "DEFAULT_PLASMA_STORE_MEMORY"] -PLASMA_WAIT_TIMEOUT = 2**30 - DEFAULT_PLASMA_STORE_MEMORY = 10**9 diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index fc89d48ed63c..d05702523c59 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -82,3 +82,6 @@ def env_integer(key, default): NO_RECONSTRUCTION = 0 # A constant indicating that an actor should be reconstructed infinite times. INFINITE_RECONSTRUCTION = 2**30 + +# Max bytes to allocate to plasma unless overriden by the user +DEFAULT_MAX_MEMORY_MB = 20 * 1000 diff --git a/python/ray/rllib/evaluation/metrics.py b/python/ray/rllib/evaluation/metrics.py index 1b270be3738c..d813c4ff09d8 100644 --- a/python/ray/rllib/evaluation/metrics.py +++ b/python/ray/rllib/evaluation/metrics.py @@ -32,7 +32,7 @@ def collect_episodes(local_evaluator, for a in remote_evaluators ] collected, _ = ray.wait( - pending, num_returns=len(pending), timeout=timeout_seconds * 1000) + pending, num_returns=len(pending), timeout_seconds=timeout_seconds) num_metric_batches_dropped = len(pending) - len(collected) metric_lists = ray.get(collected) diff --git a/python/ray/rllib/train.py b/python/ray/rllib/train.py index 5e03dfa588e4..c9137e5ed906 100755 --- a/python/ray/rllib/train.py +++ b/python/ray/rllib/train.py @@ -57,15 +57,15 @@ def create_parser(parser_creator=None): type=int, help="Emulate multiple cluster nodes for debugging.") parser.add_argument( - "--ray-redis-max-memory", + "--ray-redis-max-memory-mb", default=None, type=int, - help="--redis-max-memory to use if starting a new cluster.") + help="--redis-max-memory-mb to use if starting a new cluster.") parser.add_argument( - "--ray-object-store-memory", + "--ray-object-store-memory-mb", default=None, type=int, - help="--object-store-memory to use if starting a new cluster.") + help="--object-store-memory-mb to use if starting a new cluster.") parser.add_argument( "--experiment-name", default="default", @@ -130,14 +130,14 @@ def run(args, parser): "num_cpus": args.ray_num_cpus or 1, "num_gpus": args.ray_num_gpus or 0, }, - object_store_memory=args.ray_object_store_memory, - redis_max_memory=args.ray_redis_max_memory) + object_store_memory_mb=args.ray_object_store_memory_mb, + redis_max_memory_mb=args.ray_redis_max_memory_mb) ray.init(redis_address=cluster.redis_address) else: ray.init( redis_address=args.redis_address, - object_store_memory=args.ray_object_store_memory, - redis_max_memory=args.ray_redis_max_memory, + object_store_memory_mb=args.ray_object_store_memory_mb, + redis_max_memory_mb=args.ray_redis_max_memory_mb, num_cpus=args.ray_num_cpus, num_gpus=args.ray_num_gpus) run_experiments( diff --git a/python/ray/rllib/utils/actors.py b/python/ray/rllib/utils/actors.py index 689aa945cabf..55da12ef2c72 100644 --- a/python/ray/rllib/utils/actors.py +++ b/python/ray/rllib/utils/actors.py @@ -28,7 +28,8 @@ def add(self, worker, all_obj_ids): def completed(self): pending = list(self._tasks) if pending: - ready, _ = ray.wait(pending, num_returns=len(pending), timeout=10) + ready, _ = ray.wait( + pending, num_returns=len(pending), timeout_seconds=0.01) for obj_id in ready: yield (self._tasks.pop(obj_id), self._objects.pop(obj_id)) diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 492eac51ec8a..5553bf73e4ee 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -113,27 +113,39 @@ def cli(logging_level, logging_format): type=int, help="the port to use for starting the node manager") @click.option( - "--object-store-memory", + "--object-store-memory-mb", required=False, type=int, help="the maximum amount of memory (in bytes) to allow the " "object store to use") @click.option( - "--redis-max-memory", + "--object-store-memory", + required=False, + type=int, + help="--object-store-memory is deprecated, please use " + "--object-store-memory-mb ") +@click.option( + "--redis-max-memory-mb", required=False, type=int, help=("The max amount of memory (in bytes) to allow redis to use, or None " "for no limit. Once the limit is exceeded, redis will start LRU " "eviction of entries. This only applies to the sharded " "redis tables (task and object tables).")) +@click.option( + "--redis-max-memory", + required=False, + type=int, + help="--redis-max-memory is deprecated, please use " + "--redis-max-memory-mb") @click.option( "--collect-profiling-data", default=True, type=bool, - help=("Whether to collect profiling data. Note that " - "profiling data cannot be LRU evicted, so if you set " - "redis_max_memory then profiling will also be disabled to prevent " - "it from consuming all available redis memory.")) + help="Whether to collect profiling data. Note that profiling data cannot " + "be LRU evicted, so if you set redis_max_memory_mb then profiling will " + "also be disabled to prevent it from consuming all available redis " + "memory.") @click.option( "--num-workers", required=False, @@ -219,12 +231,24 @@ def cli(logging_level, logging_format): help="Do NOT use this. This is for debugging/development purposes ONLY.") def start(node_ip_address, redis_address, redis_port, num_redis_shards, redis_max_clients, redis_password, redis_shard_ports, - object_manager_port, node_manager_port, object_store_memory, - redis_max_memory, collect_profiling_data, num_workers, num_cpus, - num_gpus, resources, head, no_ui, block, plasma_directory, - huge_pages, autoscaling_config, no_redirect_worker_output, - no_redirect_output, plasma_store_socket_name, raylet_socket_name, - temp_dir, internal_config): + object_manager_port, node_manager_port, object_store_memory_mb, + object_store_memory, redis_max_memory_mb, redis_max_memory, + collect_profiling_data, num_workers, num_cpus, num_gpus, resources, + head, no_ui, block, plasma_directory, huge_pages, autoscaling_config, + no_redirect_worker_output, no_redirect_output, + plasma_store_socket_name, raylet_socket_name, temp_dir, + internal_config): + if object_store_memory is not None: + logger.warning("WARNING: The '--object-store-memory' argument has " + "been deprecated. Please use " + "'--object-store-memory-mb'.") + object_store_memory_mb = object_store_memory / 10**6 + + if redis_max_memory is not None: + logger.warning("WARNING: The '--redis-max-memory' argument has been " + "deprecated. Please use '--redis-max-memory-mb'.") + redis_max_memory_mb = redis_max_memory / 10**6 + # Convert hostnames to numerical IP address. if node_ip_address is not None: node_ip_address = services.address_to_ip(node_ip_address) @@ -250,7 +274,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, object_manager_ports=[object_manager_port], node_manager_ports=[node_manager_port], num_workers=num_workers, - object_store_memory=object_store_memory, + object_store_memory_mb=object_store_memory_mb, redis_password=redis_password, redirect_worker_output=not no_redirect_worker_output, redirect_output=not no_redirect_output, @@ -291,7 +315,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards, ray_params.update_if_absent( redis_port=redis_port, redis_shard_ports=redis_shard_ports, - redis_max_memory=redis_max_memory, + redis_max_memory_mb=redis_max_memory_mb, collect_profiling_data=collect_profiling_data, num_redis_shards=num_redis_shards, redis_max_clients=redis_max_clients, diff --git a/python/ray/services.py b/python/ray/services.py index 0838101567f9..7f109381de16 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -36,9 +36,6 @@ PROCESS_TYPE_REDIS_SERVER = "redis_server" PROCESS_TYPE_WEB_UI = "web_ui" -# Max bytes to allocate to plasma unless overriden by the user -MAX_DEFAULT_MEM = 20 * 1000 * 1000 * 1000 - # This is a dictionary tracking all of the processes of different types that # have been started by this services module. Note that the order of the keys is # important because it determines the order in which these processes will be @@ -417,7 +414,7 @@ def start_redis(node_ip_address, cleanup=True, password=None, use_credis=None, - redis_max_memory=None): + redis_max_memory_mb=None): """Start the Redis global state store. Args: @@ -446,9 +443,9 @@ def start_redis(node_ip_address, use_credis: If True, additionally load the chain-replicated libraries into the redis servers. Defaults to None, which means its value is set by the presence of "RAY_USE_NEW_GCS" in os.environ. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the + redis_max_memory_mb: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). Returns: @@ -481,7 +478,7 @@ def start_redis(node_ip_address, stderr_file=redis_stderr_file, cleanup=cleanup, password=password, - redis_max_memory=None) + redis_max_memory_mb=None) else: assigned_port, _ = _start_redis_instance( node_ip_address=node_ip_address, @@ -496,7 +493,7 @@ def start_redis(node_ip_address, # supplies. modules=[CREDIS_MASTER_MODULE, REDIS_MODULE], password=password, - redis_max_memory=None) + redis_max_memory_mb=None) if port is not None: assert assigned_port == port port = assigned_port @@ -531,7 +528,7 @@ def start_redis(node_ip_address, stderr_file=redis_stderr_file, cleanup=cleanup, password=password, - redis_max_memory=redis_max_memory) + redis_max_memory_mb=redis_max_memory_mb) else: assert num_redis_shards == 1, \ "For now, RAY_USE_NEW_GCS supports 1 shard, and credis "\ @@ -549,7 +546,7 @@ def start_redis(node_ip_address, # module, as the latter contains an extern declaration that the # former supplies. modules=[CREDIS_MEMBER_MODULE, REDIS_MODULE], - redis_max_memory=redis_max_memory) + redis_max_memory_mb=redis_max_memory_mb) if redis_shard_ports[i] is not None: assert redis_shard_port == redis_shard_ports[i] @@ -580,7 +577,7 @@ def _start_redis_instance(node_ip_address="127.0.0.1", password=None, executable=REDIS_EXECUTABLE, modules=None, - redis_max_memory=None): + redis_max_memory_mb=None): """Start a single Redis server. Args: @@ -604,9 +601,9 @@ def _start_redis_instance(node_ip_address="127.0.0.1", modules (list of str): A list of pathnames, pointing to the redis module(s) that will be loaded in this redis server. If None, load the default Ray redis module. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. + redis_max_memory_mb: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. Returns: A tuple of the port used by Redis and a handle to the process that was @@ -671,12 +668,13 @@ def _start_redis_instance(node_ip_address="127.0.0.1", redis_client.config_set("protected-mode", "no") # Discard old task and object metadata. - if redis_max_memory is not None: - redis_client.config_set("maxmemory", str(redis_max_memory)) + if redis_max_memory_mb is not None: + redis_client.config_set("maxmemory", str( + int(redis_max_memory_mb * 1e6))) redis_client.config_set("maxmemory-policy", "allkeys-lru") redis_client.config_set("maxmemory-samples", "10") - logger.info("Starting Redis shard with {} GB max memory.".format( - round(redis_max_memory / 1e9, 2))) + logger.info("Starting Redis shard with {}GB max memory.".format( + round(redis_max_memory_mb / 1e3, 2))) # If redis_max_clients is provided, attempt to raise the number of maximum # number of Redis clients. @@ -1006,7 +1004,7 @@ def start_raylet(ray_params, return raylet_name -def determine_plasma_store_config(object_store_memory=None, +def determine_plasma_store_config(object_store_memory_mb=None, plasma_directory=None, huge_pages=False): """Figure out how to configure the plasma object store. @@ -1019,7 +1017,7 @@ def determine_plasma_store_config(object_store_memory=None, values will be preserved. Args: - object_store_memory (int): The user-specified object store memory + object_store_memory_mb (int): The user-specified object store memory parameter. plasma_directory (str): The user-specified plasma directory parameter. huge_pages (bool): The user-specified huge pages parameter. @@ -1029,59 +1027,60 @@ def determine_plasma_store_config(object_store_memory=None, use. If either of these values is specified by the user, then that value will be preserved. """ - system_memory = ray.utils.get_system_memory() + system_memory_mb = ray.utils.get_system_memory_bytes() / 1e6 # Choose a default object store size. - if object_store_memory is None: - object_store_memory = int(system_memory * 0.4) + if object_store_memory_mb is None: + object_store_memory_mb = int(system_memory_mb * 0.4) # Cap memory to avoid memory waste and perf issues on large nodes - if object_store_memory > MAX_DEFAULT_MEM: + if object_store_memory_mb > ray.ray_constants.DEFAULT_MAX_MEMORY_MB: logger.warning( "Warning: Capping object memory store to {}GB. ".format( - MAX_DEFAULT_MEM // 1e9) + - "To increase this further, specify `object_store_memory` " - "when calling ray.init() or ray start.") - object_store_memory = MAX_DEFAULT_MEM + ray.ray_constants.DEFAULT_MAX_MEMORY_MB // 1e3) + + "To increase this further, specify " + "`object_store_memory_mb` when calling ray.init() or ray " + "start.") + object_store_memory_mb = ray.ray_constants.DEFAULT_MAX_MEMORY_MB # Determine which directory to use. By default, use /tmp on MacOS and # /dev/shm on Linux, unless the shared-memory file system is too small, # in which case we default to /tmp on Linux. if plasma_directory is None: if sys.platform == "linux" or sys.platform == "linux2": - shm_avail = ray.utils.get_shared_memory_bytes() + shm_avail_mb = ray.utils.get_shared_memory_bytes() / 1e6 # Compare the requested memory size to the memory available in # /dev/shm. - if shm_avail > object_store_memory: + if shm_avail_mb > object_store_memory_mb: plasma_directory = "/dev/shm" else: plasma_directory = "/tmp" logger.warning( "WARNING: The object store is using /tmp instead of " - "/dev/shm because /dev/shm has only {} bytes available. " + "/dev/shm because /dev/shm has only {}MB available. " "This may slow down performance! You may be able to free " "up space by deleting files in /dev/shm or terminating " "any running plasma_store_server processes. If you are " "inside a Docker container, you may need to pass an " "argument with the flag '--shm-size' to 'docker run'." - .format(shm_avail)) + .format(shm_avail_mb)) else: plasma_directory = "/tmp" # Do some sanity checks. - if object_store_memory > system_memory: + if object_store_memory_mb > system_memory_mb: raise Exception( "The requested object store memory size is greater " "than the total available memory.") else: plasma_directory = os.path.abspath(plasma_directory) - logger.warning("WARNING: object_store_memory is not verified when " + logger.warning("WARNING: object_store_memory_mb is not verified when " "plasma_directory is set.") if not os.path.isdir(plasma_directory): raise Exception("The file {} does not exist or is not a directory." .format(plasma_directory)) - return object_store_memory, plasma_directory + return object_store_memory_mb, plasma_directory def start_plasma_store(node_ip_address, @@ -1089,7 +1088,7 @@ def start_plasma_store(node_ip_address, object_manager_port=None, store_stdout_file=None, store_stderr_file=None, - object_store_memory=None, + object_store_memory_mb=None, cleanup=True, plasma_directory=None, huge_pages=False, @@ -1107,7 +1106,7 @@ def start_plasma_store(node_ip_address, to. If no redirection should happen, then this should be None. store_stderr_file: A file handle opened for writing to redirect stderr to. If no redirection should happen, then this should be None. - object_store_memory: The amount of memory (in bytes) to start the + object_store_memory_mb: The amount of memory (in bytes) to start the object store with. cleanup (bool): True if using Ray in local mode. If cleanup is true, then this process will be killed by serices.cleanup() when the @@ -1121,16 +1120,16 @@ def start_plasma_store(node_ip_address, Return: The Plasma store socket name. """ - object_store_memory, plasma_directory = determine_plasma_store_config( - object_store_memory, plasma_directory, huge_pages) + object_store_memory_mb, plasma_directory = (determine_plasma_store_config( + object_store_memory_mb, plasma_directory, huge_pages)) # Print the object store memory using two decimal places. - object_store_memory_str = (object_store_memory / 10**7) / 10**2 - logger.info("Starting the Plasma object store with {} GB memory " + object_store_memory_str = (object_store_memory_mb // 10) / 10**2 + logger.info("Starting the Plasma object store with {}GB memory " "using {}.".format(object_store_memory_str, plasma_directory)) # Start the Plasma store. plasma_store_name, p1 = ray.plasma.start_plasma_store( - plasma_store_memory=object_store_memory, + plasma_store_memory=int(object_store_memory_mb * 10**6), use_profiler=RUN_PLASMA_STORE_PROFILER, stdout_file=store_stdout_file, stderr_file=store_stderr_file, @@ -1337,7 +1336,7 @@ def start_ray_processes(ray_params, cleanup=True): redirect_worker_output=ray_params.redirect_worker_output, cleanup=cleanup, password=ray_params.redis_password, - redis_max_memory=ray_params.redis_max_memory) + redis_max_memory_mb=ray_params.redis_max_memory_mb) ray_params.address_info["redis_address"] = ray_params.redis_address time.sleep(0.1) @@ -1420,7 +1419,7 @@ def start_ray_processes(ray_params, cleanup=True): ray_params.redis_address, store_stdout_file=plasma_store_stdout_file, store_stderr_file=plasma_store_stderr_file, - object_store_memory=ray_params.object_store_memory, + object_store_memory_mb=ray_params.object_store_memory_mb, cleanup=cleanup, plasma_directory=ray_params.plasma_directory, huge_pages=ray_params.huge_pages, @@ -1470,7 +1469,7 @@ def start_ray_node(ray_params, cleanup=True): ray_params (ray.params.RayParams): The RayParams instance. The following parameters could be checked: node_ip_address, redis_address, object_manager_ports, node_manager_ports, - num_workers, num_local_schedulers, object_store_memory, + num_workers, num_local_schedulers, object_store_memory_mb, redis_password, worker_path, cleanup, redirect_worker_output, redirect_output, resources, plasma_directory, huge_pages, plasma_store_socket_name, raylet_socket_name, temp_dir, @@ -1498,8 +1497,9 @@ def start_ray_head(ray_params, cleanup=True): following parameters could be checked: address_info, object_manager_ports, node_manager_ports, node_ip_address, redis_port, redis_shard_ports, num_workers, num_local_schedulers, - object_store_memory, redis_max_memory, collect_profiling_data, - worker_path, cleanup, redirect_worker_output, redirect_output, + object_store_memory_mb, redis_max_memory_mb, + collect_profiling_data, worker_path, cleanup, + redirect_worker_output, redirect_output, start_workers_from_local_scheduler, resources, num_redis_shards, redis_max_clients, redis_password, include_webui, huge_pages, plasma_directory, autoscaling_config, plasma_store_socket_name, diff --git a/python/ray/test/cluster_utils.py b/python/ray/test/cluster_utils.py index 9c98c57e6b38..16499e4cd4a3 100644 --- a/python/ray/test/cluster_utils.py +++ b/python/ray/test/cluster_utils.py @@ -64,7 +64,7 @@ def add_node(self, **override_kwargs): All nodes are by default started with the following settings: cleanup=True, resources={"CPU": 1}, - object_store_memory=100 * (2**20) # 100 MB + object_store_memory_mb=100 # 100 MB Args: override_kwargs: Keyword arguments used in `start_ray_head` @@ -77,7 +77,7 @@ def add_node(self, **override_kwargs): "resources": { "CPU": 1 }, - "object_store_memory": 100 * (2**20) # 100 MB + "object_store_memory_mb": 100 # 100 MB } node_kwargs.update(override_kwargs) ray_params = RayParams( diff --git a/python/ray/tune/ray_trial_executor.py b/python/ray/tune/ray_trial_executor.py index 3706d00f0a52..48b1be9b13ab 100644 --- a/python/ray/tune/ray_trial_executor.py +++ b/python/ray/tune/ray_trial_executor.py @@ -105,7 +105,7 @@ def _stop_trial(self, trial, error=False, error_msg=None, stop_tasks.append(trial.runner.__ray_terminate__.remote()) # TODO(ekl) seems like wait hangs when killing actors _, unfinished = ray.wait( - stop_tasks, num_returns=2, timeout=250) + stop_tasks, num_returns=2, timeout_seconds=0.25) except Exception: logger.exception("Error stopping runner.") self.set_status(trial, Trial.ERROR) diff --git a/python/ray/tune/trial_runner.py b/python/ray/tune/trial_runner.py index eddfbc488d8c..87b409b52ae2 100644 --- a/python/ray/tune/trial_runner.py +++ b/python/ray/tune/trial_runner.py @@ -322,7 +322,7 @@ def _memory_debug_string(self): "unexpected crashes. Consider " "reducing the memory used by your application " "or reducing the Ray object store size by setting " - "`object_store_memory` when calling `ray.init`.") + "`object_store_memory_mb` when calling `ray.init`.") else: warn = "" return "Memory usage on this node: {}/{} GB{}".format( diff --git a/python/ray/utils.py b/python/ray/utils.py index cb3c33ecceac..06e626d40e9c 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -316,7 +316,7 @@ def sysctl(command): return result -def get_system_memory(): +def get_system_memory_bytes(): """Return the total amount of system memory in bytes. Returns: diff --git a/python/ray/worker.py b/python/ray/worker.py index aa45719ec414..59d1a3c52810 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1281,13 +1281,14 @@ def _init(ray_params, driver_id=None): ray_params (ray.params.RayParams): The RayParams instance. The following parameters could be checked: address_info, start_ray_local, object_id_seed, num_workers, - num_local_schedulers, object_store_memory, redis_max_memory, - collect_profiling_data, local_mode, redirect_worker_output, - driver_mode, redirect_output, start_workers_from_local_scheduler, - num_cpus, num_gpus, resources, num_redis_shards, - redis_max_clients, redis_password, plasma_directory, huge_pages, - include_webui, driver_id, plasma_store_socket_name, temp_dir, - raylet_socket_name, _internal_config + num_local_schedulers, object_store_memory_mb, + redis_max_memory_mb, collect_profiling_data, local_mode, + redirect_worker_output, driver_mode, redirect_output, + start_workers_from_local_scheduler, num_cpus, num_gpus, resources, + num_redis_shards, redis_max_clients, redis_password, + plasma_directory, huge_pages, include_webui, driver_id, + plasma_store_socket_name, temp_dir, raylet_socket_name, + _internal_config driver_id: The ID of driver. Returns: @@ -1305,10 +1306,10 @@ def _init(ray_params, driver_id=None): else: ray_params.driver_mode = SCRIPT_MODE - if ray_params.redis_max_memory and ray_params.collect_profiling_data: + if ray_params.redis_max_memory_mb and ray_params.collect_profiling_data: logger.warning( "Profiling data cannot be LRU evicted, so it is disabled " - "when redis_max_memory is set.") + "when redis_max_memory_mb is set.") ray_params.collect_profiling_data = False # Get addresses of existing services. @@ -1367,12 +1368,12 @@ def _init(ray_params, driver_id=None): if ray_params.redis_max_clients is not None: raise Exception("When connecting to an existing cluster, " "redis_max_clients must not be provided.") - if ray_params.object_store_memory is not None: + if ray_params.object_store_memory_mb is not None: raise Exception("When connecting to an existing cluster, " - "object_store_memory must not be provided.") - if ray_params.redis_max_memory is not None: + "object_store_memory_mb must not be provided.") + if ray_params.redis_max_memory_mb is not None: raise Exception("When connecting to an existing cluster, " - "redis_max_memory must not be provided.") + "redis_max_memory_mb must not be provided.") if ray_params.plasma_directory is not None: raise Exception("When connecting to an existing cluster, " "plasma_directory must not be provided.") @@ -1435,7 +1436,9 @@ def init(redis_address=None, num_gpus=None, resources=None, object_store_memory=None, + object_store_memory_mb=None, redis_max_memory=None, + redis_max_memory_mb=None, collect_profiling_data=True, node_ip_address=None, object_id_seed=None, @@ -1491,11 +1494,11 @@ def init(redis_address=None, be configured with. resources: A dictionary mapping the name of a resource to the quantity of that resource available. - object_store_memory: The amount of memory (in bytes) to start the + object_store_memory_mb: The amount of memory (in bytes) to start the object store with. - redis_max_memory: The max amount of memory (in bytes) to allow redis - to use, or None for no limit. Once the limit is exceeded, redis - will start LRU eviction of entries. This only applies to the + redis_max_memory_mb: The max amount of memory (in bytes) to allow + redis to use, or None for no limit. Once the limit is exceeded, + redis will start LRU eviction of entries. This only applies to the sharded redis tables (task and object tables). collect_profiling_data: Whether to collect profiling data from workers. node_ip_address (str): The IP address of the node that we are on. @@ -1558,6 +1561,16 @@ def init(redis_address=None, raise DeprecationWarning("The use_raylet argument is deprecated. " "Please remove it.") + if object_store_memory is not None: + logger.warning("WARNING: The 'object_store_memory' argument has been " + "deprecated. Please use 'object_store_memory_mb'.") + object_store_memory_mb = object_store_memory / 10**6 + + if redis_max_memory is not None: + logger.warning("WARNING: The 'redis_max_memory' argument has been " + "deprecated. Please use 'redis_max_memory_mb'.") + redis_max_memory_mb = redis_max_memory / 10**6 + if setproctitle is None: logger.warning( "WARNING: Not updating worker name since `setproctitle` is not " @@ -1597,8 +1610,8 @@ def init(redis_address=None, plasma_directory=plasma_directory, huge_pages=huge_pages, include_webui=include_webui, - object_store_memory=object_store_memory, - redis_max_memory=redis_max_memory, + object_store_memory_mb=object_store_memory_mb, + redis_max_memory_mb=redis_max_memory_mb, collect_profiling_data=collect_profiling_data, plasma_store_socket_name=plasma_store_socket_name, raylet_socket_name=raylet_socket_name, @@ -2313,13 +2326,17 @@ def put(value, worker=global_worker): return object_id -def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): +def wait(object_ids, + num_returns=1, + timeout_seconds=None, + timeout=None, + worker=global_worker): """Return a list of IDs that are ready and a list of IDs that are not. - If timeout is set, the function returns either when the requested number of - IDs are ready or when the timeout is reached, whichever occurs first. If it - is not set, the function simply waits until that number of objects is ready - and returns that exact number of object IDs. + If timeout_seconds is set, the function returns either when the requested + number of IDs are ready or when the timeout is reached, whichever occurs + first. If it is not set, the function simply waits until that number of + objects is ready and returns that exact number of object IDs. This method returns two lists. The first list consists of object IDs that correspond to objects that are available in the object store. The second @@ -2335,13 +2352,22 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): object_ids (List[ObjectID]): List of object IDs for objects that may or may not be ready. Note that these IDs must be unique. num_returns (int): The number of object IDs that should be returned. - timeout (int): The maximum amount of time in milliseconds to wait + timeout_seconds (int): The maximum amount of time in seconds to wait before returning. + timeout: This is deprecated. Use timeout_seconds. Returns: A list of object IDs that are ready and a list of the remaining object IDs. """ + if timeout is not None: + logger.warning("WARNING: The timeout argument has been deprecated. " + "Please use timeout_seconds.") + timeout_seconds = timeout / 1000 + + if timeout_seconds is not None and timeout_seconds < 0: + raise ValueError("The timeout_seconds argument cannot be negative. " + "Received {}.".format(timeout_seconds)) if isinstance(object_ids, ray.ObjectID): raise TypeError( @@ -2385,9 +2411,11 @@ def wait(object_ids, num_returns=1, timeout=None, worker=global_worker): with worker.state_lock: current_task_id = worker.get_current_thread_task_id() - timeout = timeout if timeout is not None else 2**30 + timeout_milliseconds = int(timeout_seconds * 1000 + if timeout_seconds is not None else 2**30) ready_ids, remaining_ids = worker.raylet_client.wait( - object_ids, num_returns, timeout, False, current_task_id) + object_ids, num_returns, timeout_milliseconds, False, + current_task_id) return ready_ids, remaining_ids diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index fd70132485bd..b85e531dd11e 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1683,7 +1683,7 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { "evicted " << "by the redis LRU configuration. Consider increasing the memory " "allocation via " - << "ray.init(redis_max_memory=)."; + << "ray.init(redis_max_memory_mb=)."; // Use a copy of the cached task spec to re-execute the task. const Task task = lineage_cache_.GetTask(task_id); ResubmitTask(task); diff --git a/test/actor_test.py b/test/actor_test.py index 29e3b10852b0..8469b1d2ac8f 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -737,7 +737,7 @@ def method(self): pass f = Foo.remote() - ready_ids, _ = ray.wait([f.method.remote()], timeout=100) + ready_ids, _ = ray.wait([f.method.remote()], timeout_seconds=0.1) assert ready_ids == [] @@ -826,7 +826,8 @@ def get_location_and_ids(self): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_seconds=0.01) assert ready_ids == [] @@ -868,7 +869,8 @@ def get_location_and_ids(self): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_seconds=0.01) assert ready_ids == [] # We should be able to create more actors that use only a single GPU. @@ -897,7 +899,8 @@ def get_location_and_ids(self): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor2.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_seconds=0.01) assert ready_ids == [] @@ -938,7 +941,8 @@ def get_location_and_ids(self): # Creating a new actor should fail because all of the GPUs are being # used. a = Actor1.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_seconds=0.01) assert ready_ids == [] @@ -1017,7 +1021,8 @@ def get_location_and_ids(self): # All the GPUs should be used up now. a = Actor.remote() - ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) + ready_ids, _ = ray.wait( + [a.get_location_and_ids.remote()], timeout_seconds=0.01) assert ready_ids == [] @@ -1162,7 +1167,7 @@ def locations_to_intervals_for_many_tasks(): # Now if we run some GPU tasks, they should not be scheduled. results = [f1.remote() for _ in range(30)] - ready_ids, remaining_ids = ray.wait(results, timeout=1000) + ready_ids, remaining_ids = ray.wait(results, timeout_seconds=1) assert len(ready_ids) == 0 @@ -1271,7 +1276,7 @@ def blocking_method(self): # block. actor = CPUFoo.remote() x_id = actor.blocking_method.remote() - ready_ids, remaining_ids = ray.wait([x_id], timeout=1000) + ready_ids, remaining_ids = ray.wait([x_id], timeout_seconds=1) assert ready_ids == [] assert remaining_ids == [x_id] @@ -1286,7 +1291,7 @@ def blocking_method(self): # Make sure that GPU resources are not released when actors block. actor = GPUFoo.remote() x_id = actor.blocking_method.remote() - ready_ids, remaining_ids = ray.wait([x_id], timeout=1000) + ready_ids, remaining_ids = ray.wait([x_id], timeout_seconds=1) assert ready_ids == [] assert remaining_ids == [x_id] @@ -2023,7 +2028,7 @@ def method(self): actor2s = [Actor2.remote() for _ in range(2)] results = [a.method.remote() for a in actor2s] ready_ids, remaining_ids = ray.wait( - results, num_returns=len(results), timeout=1000) + results, num_returns=len(results), timeout_seconds=1) assert len(ready_ids) == 1 @@ -2085,7 +2090,7 @@ def method(self): ray.wait([result2]) actor3 = ResourceActor1.remote() result3 = actor3.method.remote() - ready_ids, _ = ray.wait([result3], timeout=200) + ready_ids, _ = ray.wait([result3], timeout_seconds=0.2) assert len(ready_ids) == 0 # By deleting actor1, we free up resources to create actor3. @@ -2119,9 +2124,9 @@ def __init__(self): def create_object(self, size): return np.random.rand(size) - object_store_memory = 10**8 + object_store_memory_mb = 100 ray.init( - object_store_memory=object_store_memory, + object_store_memory_mb=object_store_memory_mb, _internal_config=json.dumps({ "initial_reconstruction_timeout_milliseconds": 200 })) @@ -2132,7 +2137,8 @@ def create_object(self, size): objects = [] num_objects = 20 for _ in range(num_objects): - obj = a.create_object.remote(object_store_memory // num_objects) + obj = a.create_object.remote( + (10**6 * object_store_memory_mb) // num_objects) objects.append(obj) # Get each object once to make sure each object gets created. ray.get(obj) @@ -2263,7 +2269,7 @@ def get_object_store_socket(self): # this test. Because if this value is too small, suprious task reconstruction # may happen and cause the test fauilure. If the value is too large, this test # could be very slow. We can remove this once we support dynamic timeout. -@pytest.mark.parametrize('head_node_cluster', [1000], indirect=True) +@pytest.mark.parametrize("head_node_cluster", [1000], indirect=True) def test_multiple_actor_reconstruction(head_node_cluster): # This test can be made more stressful by increasing the numbers below. # The total number of actors created will be diff --git a/test/component_failures_test.py b/test/component_failures_test.py index d42f5e9bb598..006de0253316 100644 --- a/test/component_failures_test.py +++ b/test/component_failures_test.py @@ -86,7 +86,7 @@ def f(id_in_a_list): time.sleep(1) # Make sure the task hasn't finished. - ready_ids, _ = ray.wait([result_id], timeout=0) + ready_ids, _ = ray.wait([result_id], timeout_seconds=0) assert len(ready_ids) == 0 # Kill the worker. @@ -94,7 +94,7 @@ def f(id_in_a_list): time.sleep(0.1) # Make sure the sleep task hasn't finished. - ready_ids, _ = ray.wait([x_id], timeout=0) + ready_ids, _ = ray.wait([x_id], timeout_seconds=0) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # get has been fulfilled. @@ -137,7 +137,7 @@ def sleep_forever(): time.sleep(0.1) # Make sure the original task hasn't finished. - ready_ids, _ = ray.wait([x_id], timeout=0) + ready_ids, _ = ray.wait([x_id], timeout_seconds=0) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # get has been fulfilled. @@ -222,7 +222,7 @@ def sleep_forever(): time.sleep(0.1) # Make sure the original task hasn't finished. - ready_ids, _ = ray.wait([x_id], timeout=0) + ready_ids, _ = ray.wait([x_id], timeout_seconds=0) assert len(ready_ids) == 0 # Seal the object so the store attempts to notify the worker that the # wait can return. @@ -401,7 +401,7 @@ def ping(self): ready, _ = ray.wait( children_out, num_returns=len(children_out), - timeout=5 * 60 * 1000) + timeout_seconds=5 * 60) assert len(ready) == len(children_out) # Replace any actors that died. diff --git a/test/failure_test.py b/test/failure_test.py index e3b5223bc28a..621082b2b1f2 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -330,7 +330,7 @@ def consume(x): pass a = Actor.remote() - [obj], _ = ray.wait([a.kill.remote()], timeout=5000) + [obj], _ = ray.wait([a.kill.remote()], timeout_seconds=5) with pytest.raises(Exception): ray.get(obj) with pytest.raises(Exception): @@ -389,17 +389,17 @@ class Actor(object): @pytest.fixture -def ray_start_object_store_memory(): +def ray_start_object_store_memory_mb(): # Start the Ray processes. store_size = 10**6 - ray.init(num_cpus=1, object_store_memory=store_size) + ray.init(num_cpus=1, object_store_memory_mb=store_size) yield None # The code after the yield will run as teardown code. ray.shutdown() @pytest.mark.skip("This test does not work yet.") -def test_put_error1(ray_start_object_store_memory): +def test_put_error1(ray_start_object_store_memory_mb): num_objects = 3 object_size = 4 * 10**5 @@ -441,7 +441,7 @@ def put_arg_task(): @pytest.mark.skip("This test does not work yet.") -def test_put_error2(ray_start_object_store_memory): +def test_put_error2(ray_start_object_store_memory_mb): # This is the same as the previous test, but it calls ray.put directly. num_objects = 3 object_size = 4 * 10**5 diff --git a/test/object_manager_test.py b/test/object_manager_test.py index 928d0dcd8a09..317a11e19ee7 100644 --- a/test/object_manager_test.py +++ b/test/object_manager_test.py @@ -21,7 +21,7 @@ def create_cluster(num_nodes): cluster = Cluster() for i in range(num_nodes): - cluster.add_node(resources={str(i): 100}, object_store_memory=10**9) + cluster.add_node(resources={str(i): 100}, object_store_memory_mb=1000) ray.init(redis_address=cluster.redis_address) return cluster diff --git a/test/runtest.py b/test/runtest.py index 7ed2ab2042bf..dc43608073d8 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -770,7 +770,8 @@ def g(): args=[], num_cpus=1, num_gpus=1, resources={"Custom": 1})) == [0] infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1}) - ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=50) + ready_ids, remaining_ids = ray.wait( + [infeasible_id], timeout_seconds=0.05) assert len(ready_ids) == 0 assert len(remaining_ids) == 1 @@ -845,14 +846,15 @@ def f(delay): objectids = [f.remote(0.5), f.remote(0.5), f.remote(0.5), f.remote(0.5)] start_time = time.time() - ready_ids, remaining_ids = ray.wait(objectids, timeout=1750, num_returns=4) + ready_ids, remaining_ids = ray.wait( + objectids, timeout_seconds=1.75, num_returns=4) assert time.time() - start_time < 2 assert len(ready_ids) == 3 assert len(remaining_ids) == 1 ray.wait(objectids) objectids = [f.remote(1.0), f.remote(0.5), f.remote(0.5), f.remote(0.5)] start_time = time.time() - ready_ids, remaining_ids = ray.wait(objectids, timeout=5000) + ready_ids, remaining_ids = ray.wait(objectids, timeout_seconds=5) assert time.time() - start_time < 5 assert len(ready_ids) == 1 assert len(remaining_ids) == 3 @@ -1073,7 +1075,7 @@ def test_object_transfer_dump(ray_start_cluster): num_nodes = 3 for i in range(num_nodes): - cluster.add_node(resources={str(i): 1}, object_store_memory=10**9) + cluster.add_node(resources={str(i): 1}, object_store_memory_mb=1000) ray.init(redis_address=cluster.redis_address) @@ -1326,13 +1328,13 @@ def run_one_test(actors, local_only): ] # Case 1: run this local_only=False. All 3 objects will be deleted. (a, b, c) = run_one_test(actors, False) - (l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=1) + (l1, l2) = ray.wait([a, b, c], timeout_seconds=0.01, num_returns=1) # All the objects are deleted. assert len(l1) == 0 assert len(l2) == 3 # Case 2: run this local_only=True. Only 1 object will be deleted. (a, b, c) = run_one_test(actors, True) - (l1, l2) = ray.wait([a, b, c], timeout=10, num_returns=3) + (l1, l2) = ray.wait([a, b, c], timeout_seconds=0.01, num_returns=3) # One object is deleted and 2 objects are not. assert len(l1) == 2 assert len(l2) == 1 @@ -1381,7 +1383,7 @@ def f(): num_returns = 5 object_ids = [ray.put(i) for i in range(20)] ready, remaining = ray.wait( - object_ids, num_returns=num_returns, timeout=None) + object_ids, num_returns=num_returns, timeout_seconds=None) assert_equal(ready, object_ids[:num_returns]) assert_equal(remaining, object_ids[num_returns:]) @@ -1763,7 +1765,7 @@ def method(self): # custom resource. TODO(rkn): Re-enable this once ray.wait is # implemented. f2 = Foo2._remote([], {}, resources={"Custom": 0.7}) - ready, _ = ray.wait([f2.method.remote()], timeout=500) + ready, _ = ray.wait([f2.method.remote()], timeout_seconds=0.5) assert len(ready) == 0 # Make sure we can start an actor that requries only 0.3 of the custom # resource. @@ -1992,7 +1994,8 @@ def k(): # Make sure that tasks with unsatisfied custom resource requirements do # not get scheduled. - ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], timeout=500) + ready_ids, remaining_ids = ray.wait( + [j.remote(), k.remote()], timeout_seconds=0.5) assert ready_ids == [] @@ -2413,7 +2416,7 @@ def test_initialized_local_mode(shutdown_only_with_initialization_check): def test_wait_reconstruction(shutdown_only): - ray.init(num_cpus=1, object_store_memory=10**8) + ray.init(num_cpus=1, object_store_memory_mb=100) @ray.remote def f(): diff --git a/test/stress_tests.py b/test/stress_tests.py index 22edf4a6bbb6..bc6afb99a857 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -196,7 +196,7 @@ def ray_start_reconstruction(request): # Start the Plasma store instances with a total of 1GB memory. plasma_store_memory = 10**9 plasma_addresses = [] - object_store_memory = plasma_store_memory // num_local_schedulers + object_store_memory_mb = plasma_store_memory / 1e6 // num_local_schedulers for i in range(num_local_schedulers): store_stdout_file, store_stderr_file = ( ray.tempfile_services.new_plasma_store_log_file(i, True)) @@ -204,7 +204,7 @@ def ray_start_reconstruction(request): ray.services.start_plasma_store( node_ip_address, redis_address, - object_store_memory=object_store_memory, + object_store_memory_mb=object_store_memory_mb, store_stdout_file=store_stdout_file, store_stderr_file=store_stderr_file)) @@ -500,7 +500,7 @@ def error_check(errors): def ray_start_driver_put_errors(): plasma_store_memory = 10**9 # Start the Ray processes. - ray.init(num_cpus=1, object_store_memory=plasma_store_memory) + ray.init(num_cpus=1, object_store_memory_mb=plasma_store_memory / 10**6) yield plasma_store_memory # The code after the yield will run as teardown code. ray.shutdown()