Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ci/jenkins_tests/run_rllib_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
--stop '{"training_iteration": 2}' \
--config '{"remote_worker_envs": true, "num_envs_per_worker": 2, "num_workers": 1, "train_batch_size": 100, "sgd_minibatch_size": 50}'

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/python/ray/rllib/tests/run_silent.sh train.py \
--env CartPole-v1 \
--run PPO \
--stop '{"training_iteration": 2}' \
--config '{"async_remote_worker_envs": true, "num_envs_per_worker": 2, "num_workers": 1, "train_batch_size": 100, "sgd_minibatch_size": 50}'

docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \
/ray/python/ray/rllib/tests/run_silent.sh train.py \
--env Pendulum-v0 \
Expand Down
14 changes: 6 additions & 8 deletions doc/source/rllib-env.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ For a full runnable code example using the custom environment API, see `custom_e

.. warning::

Please do **not** try to use gym registration to register custom environments. The gym registry is not compatible with Ray. Instead, always use the registration flows documented above.
The gym registry is not compatible with Ray. Instead, always use the registration flows documented above to ensure Ray workers can access the environment.

Configuring Environments
------------------------
Expand Down Expand Up @@ -119,7 +119,7 @@ Vectorized

RLlib will auto-vectorize Gym envs for batch evaluation if the ``num_envs_per_worker`` config is set, or you can define a custom environment class that subclasses `VectorEnv <https://github.com/ray-project/ray/blob/master/python/ray/rllib/env/vector_env.py>`__ to implement ``vector_step()`` and ``vector_reset()``.

Note that auto-vectorization only applies to policy inference by default. This means that policy inference will be batched, but your envs will still be stepped one at a time. If you would like your envs to be stepped in parallel, you can set ``"remote_worker_envs": True``. This will create env instances in Ray actors and step them in parallel. These remote processes introduce communication overheads, so this only helps if your env is very expensive to step.
Note that auto-vectorization only applies to policy inference by default. This means that policy inference will be batched, but your envs will still be stepped one at a time. If you would like your envs to be stepped in parallel, you can set ``"remote_worker_envs": True`` or ``"async_remote_worker_envs": True``. This will create env instances in Ray actors and step them in parallel. These remote processes introduce communication overheads, so this only helps if your env is very expensive to step.

Multi-Agent and Hierarchical
----------------------------
Expand Down Expand Up @@ -319,11 +319,9 @@ Note that envs can read from different partitions of the logs based on the ``wor

.. seealso::

`RLlib I/O <rllib-offline.html>`__ provides higher-level interfaces for working with offline experience datasets.
`Offline Datasets <rllib-offline.html>`__ provide higher-level interfaces for working with offline experience datasets.

Batch Asynchronous
------------------
Advanced Integrations
---------------------

The lowest-level "catch-all" environment supported by RLlib is `BaseEnv <https://github.com/ray-project/ray/blob/master/python/ray/rllib/env/base_env.py>`__. BaseEnv models multiple agents executing asynchronously in multiple environments. A call to ``poll()`` returns observations from ready agents keyed by their environment and agent ids, and actions for those agents can be sent back via ``send_actions()``. This interface can be subclassed directly to support batched simulators such as `ELF <https://github.com/facebookresearch/ELF>`__.

Under the hood, all other envs are converted to BaseEnv by RLlib so that there is a common internal path for policy evaluation.
For more complex / high-performance environment integrations, you can instead extend the low-level `BaseEnv <https://github.com/ray-project/ray/blob/master/python/ray/rllib/env/base_env.py>`__ class. This low-level API models multiple agents executing asynchronously in multiple environments. A call to ``BaseEnv:poll()`` returns observations from ready agents keyed by their environment and agent ids, and actions for those agents are sent back via ``BaseEnv:send_actions()``. BaseEnv is used to implement all the other env types in RLlib, so it offers a superset of their functionality. For example, ``BaseEnv`` is used to implement dynamic batching of observations for inference over `multiple simulator actors <https://github.com/ray-project/ray/blob/master/python/ray/rllib/env/remote_vector_env.py>`__.
2 changes: 1 addition & 1 deletion doc/source/rllib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Environments
* `Vectorized <rllib-env.html#vectorized>`__
* `Multi-Agent and Hierarchical <rllib-env.html#multi-agent-and-hierarchical>`__
* `Interfacing with External Agents <rllib-env.html#interfacing-with-external-agents>`__
* `Batch Asynchronous <rllib-env.html#batch-asynchronous>`__
* `Advanced Integrations <rllib-env.html#advanced-integrations>`__

Algorithms
----------
Expand Down
29 changes: 10 additions & 19 deletions python/ray/rllib/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@
# remote processes instead of in the same worker. This adds overheads, but
# can make sense if your envs are very CPU intensive (e.g., for StarCraft).
"remote_worker_envs": False,
# Similar to remote_worker_envs, but runs the envs asynchronously in the
# background for greater efficiency. Conflicts with remote_worker_envs.
"async_remote_worker_envs": False,

# === Offline Datasets ===
# __sphinx_doc_input_begin__
Expand Down Expand Up @@ -473,9 +476,7 @@ def make_local_evaluator(self,
"tf_session_args": self.
config["local_evaluator_tf_session_args"]
}),
extra_config or {}),
remote_worker_envs=False,
)
extra_config or {}))

@DeveloperAPI
def make_remote_evaluators(self, env_creator, policy_graph, count):
Expand All @@ -490,14 +491,8 @@ def make_remote_evaluators(self, env_creator, policy_graph, count):
cls = PolicyEvaluator.as_remote(**remote_args).remote

return [
self._make_evaluator(
cls,
env_creator,
policy_graph,
i + 1,
self.config,
remote_worker_envs=self.config["remote_worker_envs"])
for i in range(count)
self._make_evaluator(cls, env_creator, policy_graph, i + 1,
self.config) for i in range(count)
]

@DeveloperAPI
Expand Down Expand Up @@ -563,13 +558,8 @@ def _validate_config(config):
"`input_evaluation` must be a list of strings, got {}".format(
config["input_evaluation"]))

def _make_evaluator(self,
cls,
env_creator,
policy_graph,
worker_index,
config,
remote_worker_envs=False):
def _make_evaluator(self, cls, env_creator, policy_graph, worker_index,
config):
def session_creator():
logger.debug("Creating TF session {}".format(
config["tf_session_args"]))
Expand Down Expand Up @@ -639,7 +629,8 @@ def session_creator():
input_creator=input_creator,
input_evaluation=input_evaluation,
output_creator=output_creator,
remote_worker_envs=remote_worker_envs)
remote_worker_envs=config["remote_worker_envs"],
async_remote_worker_envs=config["async_remote_worker_envs"])

@override(Trainable)
def _export_model(self, export_formats, export_dir):
Expand Down
69 changes: 49 additions & 20 deletions python/ray/rllib/env/base_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,71 @@ class BaseEnv(object):
"env_0": {
"car_0": [2.4, 1.6],
"car_1": [3.4, -3.2],
}
},
"env_1": {
"car_0": [8.0, 4.1],
},
"env_2": {
"car_0": [2.3, 3.3],
"car_1": [1.4, -0.2],
"car_3": [1.2, 0.1],
},
}
>>> env.send_actions(
actions={
"env_0": {
"car_0": 0,
"car_1": 1,
}
}, ...
})
>>> obs, rewards, dones, infos, off_policy_actions = env.poll()
>>> print(obs)
{
"env_0": {
"car_0": [4.1, 1.7],
"car_1": [3.2, -4.2],
}
}, ...
}
>>> print(dones)
{
"env_0": {
"__all__": False,
"car_0": False,
"car_1": True,
}
}, ...
}
"""

@staticmethod
def to_base_env(env, make_env=None, num_envs=1, remote_envs=False):
def to_base_env(env,
make_env=None,
num_envs=1,
remote_envs=False,
async_remote_envs=False):
"""Wraps any env type as needed to expose the async interface."""
if remote_envs and num_envs == 1:

from ray.rllib.env.remote_vector_env import RemoteVectorEnv
if (remote_envs or async_remote_envs) and num_envs == 1:
raise ValueError(
"Remote envs only make sense to use if num_envs > 1 "
"(i.e. vectorization is enabled).")
if remote_envs and async_remote_envs:
raise ValueError("You can only specify one of remote_envs or "
"async_remote_envs.")

if not isinstance(env, BaseEnv):
if isinstance(env, MultiAgentEnv):
if remote_envs:
raise NotImplementedError(
"Remote multiagent environments are not implemented")

env = _MultiAgentEnvToBaseEnv(
make_env=make_env, existing_envs=[env], num_envs=num_envs)
env = RemoteVectorEnv(
make_env, num_envs, multiagent=True, sync=True)
elif async_remote_envs:
env = RemoteVectorEnv(
make_env, num_envs, multiagent=True, sync=False)
else:
env = _MultiAgentEnvToBaseEnv(
make_env=make_env,
existing_envs=[env],
num_envs=num_envs)
elif isinstance(env, ExternalEnv):
if num_envs != 1:
raise ValueError(
Expand All @@ -88,15 +111,21 @@ def to_base_env(env, make_env=None, num_envs=1, remote_envs=False):
elif isinstance(env, VectorEnv):
env = _VectorEnvToBaseEnv(env)
else:
env = VectorEnv.wrap(
make_env=make_env,
existing_envs=[env],
num_envs=num_envs,
remote_envs=remote_envs,
action_space=env.action_space,
observation_space=env.observation_space)
env = _VectorEnvToBaseEnv(env)
assert isinstance(env, BaseEnv)
if remote_envs:
env = RemoteVectorEnv(
make_env, num_envs, multiagent=False, sync=True)
elif async_remote_envs:
env = RemoteVectorEnv(
make_env, num_envs, multiagent=False, sync=False)
else:
env = VectorEnv.wrap(
make_env=make_env,
existing_envs=[env],
num_envs=num_envs,
action_space=env.action_space,
observation_space=env.observation_space)
env = _VectorEnvToBaseEnv(env)
assert isinstance(env, BaseEnv), env
return env

@PublicAPI
Expand Down
118 changes: 118 additions & 0 deletions python/ray/rllib/env/remote_vector_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import logging

import ray
from ray.rllib.env.base_env import BaseEnv, _DUMMY_AGENT_ID

logger = logging.getLogger(__name__)


class RemoteVectorEnv(BaseEnv):
"""Vector env that executes envs in remote workers.

This provides dynamic batching of inference as observations are returned
from the remote simulator actors. Both single and multi-agent child envs
are supported, and envs can be stepped synchronously or async.
"""

def __init__(self, make_env, num_envs, multiagent, sync):
self.make_local_env = make_env
if sync:
self.timeout = 9999999.0 # wait for all envs
else:
self.timeout = 0.0 # wait for only ready envs

def make_remote_env(i):
logger.info("Launching env {} in remote actor".format(i))
if multiagent:
return _RemoteMultiAgentEnv.remote(self.make_local_env, i)
else:
return _RemoteSingleAgentEnv.remote(self.make_local_env, i)

self.actors = [make_remote_env(i) for i in range(num_envs)]
self.pending = None # lazy init

def poll(self):
if self.pending is None:
self.pending = {a.reset.remote(): a for a in self.actors}

# each keyed by env_id in [0, num_remote_envs)
obs, rewards, dones, infos = {}, {}, {}, {}
ready = []

# Wait for at least 1 env to be ready here
while not ready:
ready, _ = ray.wait(
list(self.pending),
num_returns=len(self.pending),
timeout=self.timeout)

# Get and return observations for each of the ready envs
env_ids = set()
for obj_id in ready:
actor = self.pending.pop(obj_id)
env_id = self.actors.index(actor)
env_ids.add(env_id)
ob, rew, done, info = ray.get(obj_id)
obs[env_id] = ob
rewards[env_id] = rew
dones[env_id] = done
infos[env_id] = info

logger.debug("Got obs batch for actors {}".format(env_ids))
return obs, rewards, dones, infos, {}

def send_actions(self, action_dict):
for env_id, actions in action_dict.items():
actor = self.actors[env_id]
obj_id = actor.step.remote(actions)
self.pending[obj_id] = actor

def try_reset(self, env_id):
obs, _, _, _ = ray.get(self.actors[env_id].reset.remote())
return obs


@ray.remote(num_cpus=0)
class _RemoteMultiAgentEnv(object):
"""Wrapper class for making a multi-agent env a remote actor."""

def __init__(self, make_env, i):
self.env = make_env(i)

def reset(self):
obs = self.env.reset()
# each keyed by agent_id in the env
rew = {agent_id: 0 for agent_id in obs.keys()}
info = {agent_id: {} for agent_id in obs.keys()}
done = {"__all__": False}
return obs, rew, done, info

def step(self, action_dict):
return self.env.step(action_dict)


@ray.remote(num_cpus=0)
class _RemoteSingleAgentEnv(object):
"""Wrapper class for making a gym env a remote actor."""

def __init__(self, make_env, i):
self.env = make_env(i)

def reset(self):
obs = {_DUMMY_AGENT_ID: self.env.reset()}
rew = {agent_id: 0 for agent_id in obs.keys()}
info = {agent_id: {} for agent_id in obs.keys()}
done = {"__all__": False}
return obs, rew, done, info

def step(self, action):
obs, rew, done, info = self.env.step(action[_DUMMY_AGENT_ID])
obs, rew, done, info = [{
_DUMMY_AGENT_ID: x
} for x in [obs, rew, done, info]]
done["__all__"] = done[_DUMMY_AGENT_ID]
return obs, rew, done, info
Loading