Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions doc/source/redis-memory-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ task/object generation rate could risk high memory pressure, potentially leading
to out-of-memory (OOM) errors.

In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object
metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the
previously documented flushing functionality.
metadata by setting ``redis_max_memory_bytes`` when starting Ray. This
supercedes the previously documented flushing functionality.

Note that profiling is disabled when ``redis_max_memory`` is set. This is because
profiling data cannot be LRU evicted.
Note that profiling is disabled when ``redis_max_memory_bytes`` is set. This is
because profiling data cannot be LRU evicted.
2 changes: 1 addition & 1 deletion python/benchmarks/benchmark_wait.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def time_wait_many_tasks(self, num_returns):
time_wait_many_tasks.param_names = ["num_returns"]

def time_wait_timeout(self, timeout):
ray.wait([sleep.remote(0.5)], timeout=timeout)
ray.wait([sleep.remote(0.5)], timeout_milliseconds=timeout)

time_wait_timeout.params = [200, 800]
time_wait_timeout.param_names = ["timeout_ms"]
11 changes: 6 additions & 5 deletions python/ray/experimental/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get(object_ids, worker=None):
return ray.get(object_ids, worker)


def wait(object_ids, num_returns=1, timeout=None, worker=None):
def wait(object_ids, num_returns=1, timeout_milliseconds=None, worker=None):
"""Return a list of IDs that are ready and a list of IDs that are not.

This method is identical to `ray.wait` except it adds support for tuples
Expand All @@ -52,15 +52,16 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None):
List like of object IDs for objects that may or may not be ready.
Note that these IDs must be unique.
num_returns (int): The number of object IDs that should be returned.
timeout (int): The maximum amount of time in milliseconds to wait
before returning.
timeout_milliseconds (int): The maximum amount of time in milliseconds
to wait before returning.

Returns:
A list of object IDs that are ready and a list of the remaining object
IDs.
"""
worker = ray.worker.global_worker if worker is None else worker
if isinstance(object_ids, (tuple, np.ndarray)):
return ray.wait(list(object_ids), num_returns, timeout, worker)
return ray.wait(
list(object_ids), num_returns, timeout_milliseconds, worker)

return ray.wait(object_ids, num_returns, timeout, worker)
return ray.wait(object_ids, num_returns, timeout_milliseconds, worker)
2 changes: 1 addition & 1 deletion python/ray/experimental/async_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def as_future(self, object_id, check_ready=True):
fut = PlasmaObjectFuture(loop=self._loop, object_id=plain_object_id)

if check_ready:
ready, _ = ray.wait([object_id], timeout=0)
ready, _ = ray.wait([object_id], timeout_milliseconds=0)
if ready:
if self._loop.get_debug():
logger.debug("%s has been ready.", plain_object_id)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/experimental/sgd/test_sgd.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
parser.add_argument("--devices-per-worker", default=2, type=int)
parser.add_argument("--stats-interval", default=10, type=int)
parser.add_argument("--all-reduce-alg", default="simple", type=str)
parser.add_argument("--object-store-memory", default=None, type=int)
parser.add_argument("--object-store-memory-bytes", default=None, type=int)
parser.add_argument(
"--warmup", action="store_true", help="Warm up object store before start.")
parser.add_argument(
Expand All @@ -32,7 +32,7 @@
args, _ = parser.parse_known_args()
ray.init(
redis_address=args.redis_address,
object_store_memory=args.object_store_memory)
object_store_memory_bytes=args.object_store_memory_bytes)

model_creator = (
lambda worker_idx, device_idx: TFBenchModel(
Expand Down
4 changes: 2 additions & 2 deletions python/ray/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def get_message(used_gb, total_gb, threshold):
"\n\nIn addition, ~{} GB of shared memory is ".format(
round(psutil.virtual_memory().shared / 1e9, 2)) +
"currently being used by the Ray object store. You can set "
"the object store size with the `object_store_memory` "
"the object store size with the `object_store_memory_bytes` "
"parameter when starting Ray, and the max Redis size with "
"`redis_max_memory`.")
"`redis_max_memory_bytes`.")


class MemoryMonitor(object):
Expand Down
4 changes: 3 additions & 1 deletion python/ray/rllib/evaluation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def collect_episodes(local_evaluator,
for a in remote_evaluators
]
collected, _ = ray.wait(
pending, num_returns=len(pending), timeout=timeout_seconds * 1000)
pending,
num_returns=len(pending),
timeout_milliseconds=timeout_seconds * 1000)
num_metric_batches_dropped = len(pending) - len(collected)

metric_lists = ray.get(collected)
Expand Down
16 changes: 8 additions & 8 deletions python/ray/rllib/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ def create_parser(parser_creator=None):
type=int,
help="Emulate multiple cluster nodes for debugging.")
parser.add_argument(
"--ray-redis-max-memory",
"--ray-redis-max-memory-bytes",
default=None,
type=int,
help="--redis-max-memory to use if starting a new cluster.")
help="--redis-max-memory-bytes to use if starting a new cluster.")
parser.add_argument(
"--ray-object-store-memory",
"--ray-object-store-memory-bytes",
default=None,
type=int,
help="--object-store-memory to use if starting a new cluster.")
help="--object-store-memory-bytes to use if starting a new cluster.")
parser.add_argument(
"--experiment-name",
default="default",
Expand Down Expand Up @@ -125,14 +125,14 @@ def run(args, parser):
"num_cpus": args.ray_num_cpus or 1,
"num_gpus": args.ray_num_gpus or 0,
},
object_store_memory=args.ray_object_store_memory,
redis_max_memory=args.ray_redis_max_memory)
object_store_memory_bytes=args.ray_object_store_memory_bytes,
redis_max_memory_bytes=args.ray_redis_max_memory_bytes)
ray.init(redis_address=cluster.redis_address)
else:
ray.init(
redis_address=args.redis_address,
object_store_memory=args.ray_object_store_memory,
redis_max_memory=args.ray_redis_max_memory,
object_store_memory_bytes=args.ray_object_store_memory_bytes,
redis_max_memory_bytes=args.ray_redis_max_memory_bytes,
num_cpus=args.ray_num_cpus,
num_gpus=args.ray_num_gpus)
run_experiments(
Expand Down
3 changes: 2 additions & 1 deletion python/ray/rllib/utils/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def add(self, worker, all_obj_ids):
def completed(self):
pending = list(self._tasks)
if pending:
ready, _ = ray.wait(pending, num_returns=len(pending), timeout=10)
ready, _ = ray.wait(
pending, num_returns=len(pending), timeout_milliseconds=10)
for obj_id in ready:
yield (self._tasks.pop(obj_id), self._objects.pop(obj_id))

Expand Down
41 changes: 31 additions & 10 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,22 @@ def cli(logging_level, logging_format):
"--object-store-memory",
required=False,
type=int,
help="deprecated, use --object-store-memory-bytes")
@click.option(
"--object-store-memory-bytes",
required=False,
type=int,
help="the maximum amount of memory (in bytes) to allow the "
"object store to use")
@click.option(
"--redis-max-memory",
required=False,
type=int,
help="deprecated, use --redis-max-memory-bytes")
@click.option(
"--redis-max-memory-bytes",
required=False,
type=int,
help=("The max amount of memory (in bytes) to allow redis to use, or None "
"for no limit. Once the limit is exceeded, redis will start LRU "
"eviction of entries. This only applies to the sharded "
Expand All @@ -130,8 +140,8 @@ def cli(logging_level, logging_format):
type=bool,
help=("Whether to collect profiling data. Note that "
"profiling data cannot be LRU evicted, so if you set "
"redis_max_memory then profiling will also be disabled to prevent "
"it from consuming all available redis memory."))
"redis_max_memory_bytes then profiling will also be disabled to "
"prevent it from consuming all available redis memory."))
@click.option(
"--num-workers",
required=False,
Expand Down Expand Up @@ -218,11 +228,22 @@ def cli(logging_level, logging_format):
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, object_store_memory,
redis_max_memory, collect_profiling_data, num_workers, num_cpus,
num_gpus, resources, head, no_ui, block, plasma_directory,
huge_pages, autoscaling_config, no_redirect_worker_output,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, internal_config):
object_store_memory_bytes, redis_max_memory, redis_max_memory_bytes,
collect_profiling_data, num_workers, num_cpus, num_gpus, resources,
head, no_ui, block, plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir,
internal_config):
if object_store_memory is not None:
logger.warning("WARNING: The --object-store-memory argument has been "
"deprecated. Please use --object-store-memory-bytes.")
object_store_memory_bytes = object_store_memory

if redis_max_memory is not None:
logger.warning("WARNING: The --redis-max-memory argument has been "
"deprecated. Please use --redis-max-memory-bytes.")
redis_max_memory_bytes = redis_max_memory

# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
Expand Down Expand Up @@ -277,8 +298,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
node_ip_address=node_ip_address,
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
object_store_memory_bytes=object_store_memory_bytes,
redis_max_memory_bytes=redis_max_memory_bytes,
collect_profiling_data=collect_profiling_data,
num_workers=num_workers,
cleanup=False,
Expand Down Expand Up @@ -364,7 +385,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
object_manager_ports=[object_manager_port],
node_manager_ports=[node_manager_port],
num_workers=num_workers,
object_store_memory=object_store_memory,
object_store_memory_bytes=object_store_memory_bytes,
redis_password=redis_password,
cleanup=False,
redirect_worker_output=not no_redirect_worker_output,
Expand Down
Loading