Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions doc/source/redis-memory-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ task/object generation rate could risk high memory pressure, potentially leading
to out-of-memory (OOM) errors.

In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object
metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the
previously documented flushing functionality.
metadata by setting ``redis_max_memory_mb`` when starting Ray. This
supercedes the previously documented flushing functionality.

Note that profiling is disabled when ``redis_max_memory`` is set. This is because
profiling data cannot be LRU evicted.
Note that profiling is disabled when ``redis_max_memory_mb`` is set. This is
because profiling data cannot be LRU evicted.
16 changes: 12 additions & 4 deletions python/ray/experimental/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def get(object_ids, worker=None):
return ray.get(object_ids, worker)


def wait(object_ids, num_returns=1, timeout=None, worker=None):
def wait(object_ids, num_returns=1, timeout_seconds=None, worker=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've agreed offline that this will go back to timeout in #3706 . Note that this will preserve the API, but will change the semantics (units) for the timeout parameter in the 0.6.2 release.

"""Return a list of IDs that are ready and a list of IDs that are not.

This method is identical to `ray.wait` except it adds support for tuples
Expand All @@ -52,7 +52,7 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None):
List like of object IDs for objects that may or may not be ready.
Note that these IDs must be unique.
num_returns (int): The number of object IDs that should be returned.
timeout (int): The maximum amount of time in milliseconds to wait
timeout_seconds (int): The maximum amount of time in seconds to wait
before returning.

Returns:
Expand All @@ -61,6 +61,14 @@ def wait(object_ids, num_returns=1, timeout=None, worker=None):
"""
worker = ray.worker.global_worker if worker is None else worker
if isinstance(object_ids, (tuple, np.ndarray)):
return ray.wait(list(object_ids), num_returns, timeout, worker)
return ray.wait(
list(object_ids),
num_returns=num_returns,
timeout_seconds=timeout_seconds,
worker=worker)

return ray.wait(object_ids, num_returns, timeout, worker)
return ray.wait(
object_ids,
num_returns=num_returns,
timeout_seconds=timeout_seconds,
worker=worker)
2 changes: 1 addition & 1 deletion python/ray/experimental/async_plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def as_future(self, object_id, check_ready=True):
fut = PlasmaObjectFuture(loop=self._loop, object_id=plain_object_id)

if check_ready:
ready, _ = ray.wait([object_id], timeout=0)
ready, _ = ray.wait([object_id], timeout_seconds=0)
if ready:
if self._loop.get_debug():
logger.debug("%s has been ready.", plain_object_id)
Expand Down
4 changes: 2 additions & 2 deletions python/ray/experimental/sgd/test_sgd.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
parser.add_argument("--devices-per-worker", default=2, type=int)
parser.add_argument("--stats-interval", default=10, type=int)
parser.add_argument("--all-reduce-alg", default="simple", type=str)
parser.add_argument("--object-store-memory", default=None, type=int)
parser.add_argument("--object-store-memory-mb", default=None, type=int)
parser.add_argument(
"--warmup", action="store_true", help="Warm up object store before start.")
parser.add_argument(
Expand All @@ -32,7 +32,7 @@
args, _ = parser.parse_known_args()
ray.init(
redis_address=args.redis_address,
object_store_memory=args.object_store_memory)
object_store_memory_mb=args.object_store_memory_mb)

model_creator = (
lambda worker_idx, device_idx: TFBenchModel(
Expand Down
4 changes: 2 additions & 2 deletions python/ray/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def get_message(used_gb, total_gb, threshold):
"\n\nIn addition, ~{} GB of shared memory is ".format(
round(psutil.virtual_memory().shared / 1e9, 2)) +
"currently being used by the Ray object store. You can set "
"the object store size with the `object_store_memory` "
"the object store size with the `object_store_memory_mb` "
"parameter when starting Ray, and the max Redis size with "
"`redis_max_memory`.")
"`redis_max_memory_mb`.")


class MemoryMonitor(object):
Expand Down
18 changes: 9 additions & 9 deletions python/ray/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class RayParams(object):
This is only provided if start_ray_local is True.
resources: A dictionary mapping the name of a resource to the quantity
of that resource available.
object_store_memory: The amount of memory (in bytes) to start the
object store with.
redis_max_memory: The max amount of memory (in bytes) to allow redis
to use, or None for no limit. Once the limit is exceeded, redis
will start LRU eviction of entries. This only applies to the
object_store_memory_mb: The amount of memory (in megabytes) to start
the object store with.
redis_max_memory_mb: The max amount of memory (in bytes) to allow
redis to use, or None for no limit. Once the limit is exceeded,
redis will start LRU eviction of entries. This only applies to the
sharded redis tables (task and object tables).
object_manager_ports (list): A list of the ports to use for the object
managers. There should be one per object manager being started on
Expand Down Expand Up @@ -100,8 +100,8 @@ def __init__(self,
num_gpus=None,
num_local_schedulers=None,
resources=None,
object_store_memory=None,
redis_max_memory=None,
object_store_memory_mb=None,
redis_max_memory_mb=None,
redis_port=None,
redis_shard_ports=None,
object_manager_ports=None,
Expand Down Expand Up @@ -137,8 +137,8 @@ def __init__(self,
self.num_gpus = num_gpus
self.num_local_schedulers = num_local_schedulers
self.resources = resources
self.object_store_memory = object_store_memory
self.redis_max_memory = redis_max_memory
self.object_store_memory_mb = object_store_memory_mb
self.redis_max_memory_mb = redis_max_memory_mb
self.redis_port = redis_port
self.redis_shard_ports = redis_shard_ports
self.object_manager_ports = object_manager_ports
Expand Down
2 changes: 0 additions & 2 deletions python/ray/plasma/plasma.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@

__all__ = ["start_plasma_store", "DEFAULT_PLASMA_STORE_MEMORY"]

PLASMA_WAIT_TIMEOUT = 2**30

DEFAULT_PLASMA_STORE_MEMORY = 10**9


Expand Down
3 changes: 3 additions & 0 deletions python/ray/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,6 @@ def env_integer(key, default):
NO_RECONSTRUCTION = 0
# A constant indicating that an actor should be reconstructed infinite times.
INFINITE_RECONSTRUCTION = 2**30

# Max bytes to allocate to plasma unless overriden by the user
DEFAULT_MAX_MEMORY_MB = 20 * 1000
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to this file from services.py

2 changes: 1 addition & 1 deletion python/ray/rllib/evaluation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def collect_episodes(local_evaluator,
for a in remote_evaluators
]
collected, _ = ray.wait(
pending, num_returns=len(pending), timeout=timeout_seconds * 1000)
pending, num_returns=len(pending), timeout_seconds=timeout_seconds)
num_metric_batches_dropped = len(pending) - len(collected)

metric_lists = ray.get(collected)
Expand Down
16 changes: 8 additions & 8 deletions python/ray/rllib/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ def create_parser(parser_creator=None):
type=int,
help="Emulate multiple cluster nodes for debugging.")
parser.add_argument(
"--ray-redis-max-memory",
"--ray-redis-max-memory-mb",
default=None,
type=int,
help="--redis-max-memory to use if starting a new cluster.")
help="--redis-max-memory-mb to use if starting a new cluster.")
parser.add_argument(
"--ray-object-store-memory",
"--ray-object-store-memory-mb",
default=None,
type=int,
help="--object-store-memory to use if starting a new cluster.")
help="--object-store-memory-mb to use if starting a new cluster.")
parser.add_argument(
"--experiment-name",
default="default",
Expand Down Expand Up @@ -130,14 +130,14 @@ def run(args, parser):
"num_cpus": args.ray_num_cpus or 1,
"num_gpus": args.ray_num_gpus or 0,
},
object_store_memory=args.ray_object_store_memory,
redis_max_memory=args.ray_redis_max_memory)
object_store_memory_mb=args.ray_object_store_memory_mb,
redis_max_memory_mb=args.ray_redis_max_memory_mb)
ray.init(redis_address=cluster.redis_address)
else:
ray.init(
redis_address=args.redis_address,
object_store_memory=args.ray_object_store_memory,
redis_max_memory=args.ray_redis_max_memory,
object_store_memory_mb=args.ray_object_store_memory_mb,
redis_max_memory_mb=args.ray_redis_max_memory_mb,
num_cpus=args.ray_num_cpus,
num_gpus=args.ray_num_gpus)
run_experiments(
Expand Down
3 changes: 2 additions & 1 deletion python/ray/rllib/utils/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def add(self, worker, all_obj_ids):
def completed(self):
pending = list(self._tasks)
if pending:
ready, _ = ray.wait(pending, num_returns=len(pending), timeout=10)
ready, _ = ray.wait(
pending, num_returns=len(pending), timeout_seconds=0.01)
for obj_id in ready:
yield (self._tasks.pop(obj_id), self._objects.pop(obj_id))

Expand Down
52 changes: 38 additions & 14 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,39 @@ def cli(logging_level, logging_format):
type=int,
help="the port to use for starting the node manager")
@click.option(
"--object-store-memory",
"--object-store-memory-mb",
required=False,
type=int,
help="the maximum amount of memory (in bytes) to allow the "
"object store to use")
@click.option(
"--redis-max-memory",
"--object-store-memory",
required=False,
type=int,
help="--object-store-memory is deprecated, please use "
"--object-store-memory-mb ")
@click.option(
"--redis-max-memory-mb",
required=False,
type=int,
help=("The max amount of memory (in bytes) to allow redis to use, or None "
"for no limit. Once the limit is exceeded, redis will start LRU "
"eviction of entries. This only applies to the sharded "
"redis tables (task and object tables)."))
@click.option(
"--redis-max-memory",
required=False,
type=int,
help="--redis-max-memory is deprecated, please use "
"--redis-max-memory-mb")
@click.option(
"--collect-profiling-data",
default=True,
type=bool,
help=("Whether to collect profiling data. Note that "
"profiling data cannot be LRU evicted, so if you set "
"redis_max_memory then profiling will also be disabled to prevent "
"it from consuming all available redis memory."))
help="Whether to collect profiling data. Note that profiling data cannot "
"be LRU evicted, so if you set redis_max_memory_mb then profiling will "
"also be disabled to prevent it from consuming all available redis "
"memory.")
@click.option(
"--num-workers",
required=False,
Expand Down Expand Up @@ -219,12 +231,24 @@ def cli(logging_level, logging_format):
help="Do NOT use this. This is for debugging/development purposes ONLY.")
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, object_store_memory,
redis_max_memory, collect_profiling_data, num_workers, num_cpus,
num_gpus, resources, head, no_ui, block, plasma_directory,
huge_pages, autoscaling_config, no_redirect_worker_output,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, internal_config):
object_manager_port, node_manager_port, object_store_memory_mb,
object_store_memory, redis_max_memory_mb, redis_max_memory,
collect_profiling_data, num_workers, num_cpus, num_gpus, resources,
head, no_ui, block, plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir,
internal_config):
if object_store_memory is not None:
logger.warning("WARNING: The '--object-store-memory' argument has "
"been deprecated. Please use "
"'--object-store-memory-mb'.")
object_store_memory_mb = object_store_memory / 10**6

if redis_max_memory is not None:
logger.warning("WARNING: The '--redis-max-memory' argument has been "
"deprecated. Please use '--redis-max-memory-mb'.")
redis_max_memory_mb = redis_max_memory / 10**6

# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
Expand All @@ -250,7 +274,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
object_manager_ports=[object_manager_port],
node_manager_ports=[node_manager_port],
num_workers=num_workers,
object_store_memory=object_store_memory,
object_store_memory_mb=object_store_memory_mb,
redis_password=redis_password,
redirect_worker_output=not no_redirect_worker_output,
redirect_output=not no_redirect_output,
Expand Down Expand Up @@ -291,7 +315,7 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
ray_params.update_if_absent(
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,
redis_max_memory=redis_max_memory,
redis_max_memory_mb=redis_max_memory_mb,
collect_profiling_data=collect_profiling_data,
num_redis_shards=num_redis_shards,
redis_max_clients=redis_max_clients,
Expand Down
Loading