Skip to content
93 changes: 5 additions & 88 deletions doc/source/redis-memory-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,92 +7,9 @@ servers, as described in `An Overview of the Internals
task/object generation rate could risk high memory pressure, potentially leading
to out-of-memory (OOM) errors.

Here, we describe an experimental feature that transparently flushes metadata
entries out of Redis memory.
In Ray `0.6.1+` Redis shards can be configured to LRU evict task and object
metadata by setting ``redis_max_memory`` when starting Ray. This supercedes the
previously documented flushing functionality.

Requirements
------------

As of early July 2018, the automatic memory management feature requires building
Ray from source. We are planning on eliminating this step in the near future by
releasing official wheels.

Building Ray
~~~~~~~~~~~~

First, follow `instructions to build Ray from source
<installation.html#building-ray-from-source>`__ to install prerequisites. After
the prerequisites are installed, instead of doing the regular ``pip install`` as
referenced in that document, pass an additional special flag,
``RAY_USE_NEW_GCS=on``:

.. code-block:: bash

git clone https://github.com/ray-project/ray.git
cd ray/python
RAY_USE_NEW_GCS=on pip install -e . --verbose # Add --user if you see a permission denied error.

Running Ray applications
~~~~~~~~~~~~~~~~~~~~~~~~

At run time the environment variables ``RAY_USE_NEW_GCS=on`` and
``RAY_USE_XRAY=1`` are required.

.. code-block:: bash

export RAY_USE_NEW_GCS=on
export RAY_USE_XRAY=1
python my_ray_script.py # Or launch python/ipython.

Activate memory flushing
------------------------

After building Ray using the method above, simply add these two lines after
``ray.init()`` to activate automatic memory flushing:

.. code-block:: python

ray.init(...)

policy = ray.experimental.SimpleGcsFlushPolicy()
ray.experimental.set_flushing_policy(policy)

# My awesome Ray application logic follows.

Paramaters of the flushing policy
---------------------------------

There are three `user-configurable parameters
<https://github.com/ray-project/ray/blob/8190ff1fd0c4b82f73e2c1c0f21de6bda494718c/python/ray/experimental/gcs_flush_policy.py#L31>`_
of the ``SimpleGcsFlushPolicy``:

* ``flush_when_at_least_bytes``: Wait until this many bytes of memory usage
accumulated in the redis server before flushing kicks in.
* ``flush_period_secs``: Issue a flush to the Redis server every this many
seconds.
* ``flush_num_entries_each_time``: A hint to the system on the number of entries
to flush on each request.

The default values should serve to be non-invasive for lightweight Ray
applications. ``flush_when_at_least_bytes`` is set to ``(1<<31)`` or 2GB,
``flush_period_secs`` to 10, and ``flush_num_entries_each_time`` to 10000:

.. code-block:: python

# Default parameters.
ray.experimental.SimpleGcsFlushPolicy(
flush_when_at_least_bytes=(1 << 31),
flush_period_secs=10,
flush_num_entries_each_time=10000)

In particular, these default values imply that

1. the Redis server would accumulate memory usage up to 2GB without any entries
being flushed, then the flushing would kick in; and

2. generally, "older" metadata entries would be flushed first, and the Redis
server would always keep the most recent window of metadata of 2GB in size.

**For advanced users.** Advanced users can tune the above parameters to their
applications' needs; note that the desired flush rate is equal to (flush
period) * (num entries each flush).
Note that profiling is disabled when ``redis_max_memory`` is set. This is because
profiling data cannot be LRU evicted.
3 changes: 2 additions & 1 deletion python/ray/memory_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ def get_message(used_gb, total_gb, threshold):
round(psutil.virtual_memory().shared / 1e9, 2)) +
"currently being used by the Ray object store. You can set "
"the object store size with the `object_store_memory` "
"parameter when starting Ray.")
"parameter when starting Ray, and the max Redis size with "
"`redis_max_memory`.")


class MemoryMonitor(object):
Expand Down
13 changes: 13 additions & 0 deletions python/ray/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ def add_event(self, event):
self.events.append(event)


class NoopProfiler(object):
"""A no-op profile used when collect_profile_data=False."""

def start_flush_thread(self):
pass

def flush_profile_data(self):
pass

def add_event(self, event):
pass


class RayLogSpanRaylet(object):
"""An object used to enable logging a span of events with a with statement.

Expand Down
21 changes: 13 additions & 8 deletions python/ray/rllib/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,30 +38,33 @@ def create_parser(parser_creator=None):
"--redis-address",
default=None,
type=str,
help="The Redis address of the cluster.")
help="Connect to an existing Ray cluster at this address instead "
"of starting a new one.")
parser.add_argument(
"--ray-num-cpus",
default=None,
type=int,
help="--num-cpus to pass to Ray."
" This only has an affect in local mode.")
help="--num-cpus to use if starting a new cluster.")
parser.add_argument(
"--ray-num-gpus",
default=None,
type=int,
help="--num-gpus to pass to Ray."
" This only has an affect in local mode.")
help="--num-gpus to use if starting a new cluster.")
parser.add_argument(
"--ray-num-local-schedulers",
default=None,
type=int,
help="Emulate multiple cluster nodes for debugging.")
parser.add_argument(
"--ray-redis-max-memory",
default=None,
type=int,
help="--redis-max-memory to use if starting a new cluster.")
parser.add_argument(
"--ray-object-store-memory",
default=None,
type=int,
help="--object-store-memory to pass to Ray."
" This only has an affect in local mode.")
help="--object-store-memory to use if starting a new cluster.")
parser.add_argument(
"--experiment-name",
default="default",
Expand Down Expand Up @@ -122,12 +125,14 @@ def run(args, parser):
"num_cpus": args.ray_num_cpus or 1,
"num_gpus": args.ray_num_gpus or 0,
},
object_store_memory=args.ray_object_store_memory)
object_store_memory=args.ray_object_store_memory,
redis_max_memory=args.ray_redis_max_memory)
ray.init(redis_address=cluster.redis_address)
else:
ray.init(
redis_address=args.redis_address,
object_store_memory=args.ray_object_store_memory,
redis_max_memory=args.ray_redis_max_memory,
num_cpus=args.ray_num_cpus,
num_gpus=args.ray_num_gpus)
run_experiments(
Expand Down
28 changes: 23 additions & 5 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,22 @@ def cli(logging_level, logging_format):
type=int,
help="the maximum amount of memory (in bytes) to allow the "
"object store to use")
@click.option(
"--redis-max-memory",
required=False,
type=int,
help=("The max amount of memory (in bytes) to allow redis to use, or None "
"for no limit. Once the limit is exceeded, redis will start LRU "
"eviction of entries. This only applies to the sharded "
"redis tables (task and object tables)."))
@click.option(
"--collect-profiling-data",
default=True,
type=bool,
help=("Whether to collect profiling data. Note that "
"profiling data cannot be LRU evicted, so if you set "
"redis_max_memory then profiling will also be disabled to prevent "
"it from consuming all available redis memory."))
@click.option(
"--num-workers",
required=False,
Expand Down Expand Up @@ -202,11 +218,11 @@ def cli(logging_level, logging_format):
def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_max_clients, redis_password, redis_shard_ports,
object_manager_port, node_manager_port, object_store_memory,
num_workers, num_cpus, num_gpus, resources, head, no_ui, block,
plasma_directory, huge_pages, autoscaling_config,
no_redirect_worker_output, no_redirect_output,
plasma_store_socket_name, raylet_socket_name, temp_dir,
internal_config):
redis_max_memory, collect_profiling_data, num_workers, num_cpus,
num_gpus, resources, head, no_ui, block, plasma_directory,
huge_pages, autoscaling_config, no_redirect_worker_output,
no_redirect_output, plasma_store_socket_name, raylet_socket_name,
temp_dir, internal_config):
# Convert hostnames to numerical IP address.
if node_ip_address is not None:
node_ip_address = services.address_to_ip(node_ip_address)
Expand Down Expand Up @@ -262,6 +278,8 @@ def start(node_ip_address, redis_address, redis_port, num_redis_shards,
redis_port=redis_port,
redis_shard_ports=redis_shard_ports,
object_store_memory=object_store_memory,
redis_max_memory=redis_max_memory,
collect_profiling_data=collect_profiling_data,
num_workers=num_workers,
cleanup=False,
redirect_worker_output=not no_redirect_worker_output,
Expand Down
Loading