diff --git a/ci/jenkins_tests/run_rllib_tests.sh b/ci/jenkins_tests/run_rllib_tests.sh index a110dac12c85..3a899f8806aa 100644 --- a/ci/jenkins_tests/run_rllib_tests.sh +++ b/ci/jenkins_tests/run_rllib_tests.sh @@ -62,6 +62,13 @@ docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ --stop '{"training_iteration": 2}' \ --config '{"remote_worker_envs": true, "num_envs_per_worker": 2, "num_workers": 1, "train_batch_size": 100, "sgd_minibatch_size": 50}' +docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ + /ray/python/ray/rllib/tests/run_silent.sh train.py \ + --env CartPole-v1 \ + --run PPO \ + --stop '{"training_iteration": 2}' \ + --config '{"async_remote_worker_envs": true, "num_envs_per_worker": 2, "num_workers": 1, "train_batch_size": 100, "sgd_minibatch_size": 50}' + docker run --rm --shm-size=${SHM_SIZE} --memory=${MEMORY_SIZE} $DOCKER_SHA \ /ray/python/ray/rllib/tests/run_silent.sh train.py \ --env Pendulum-v0 \ diff --git a/doc/source/rllib-env.rst b/doc/source/rllib-env.rst index a03077c7ed63..7180099fca04 100644 --- a/doc/source/rllib-env.rst +++ b/doc/source/rllib-env.rst @@ -66,7 +66,7 @@ For a full runnable code example using the custom environment API, see `custom_e .. warning:: - Please do **not** try to use gym registration to register custom environments. The gym registry is not compatible with Ray. Instead, always use the registration flows documented above. + The gym registry is not compatible with Ray. Instead, always use the registration flows documented above to ensure Ray workers can access the environment. Configuring Environments ------------------------ @@ -119,7 +119,7 @@ Vectorized RLlib will auto-vectorize Gym envs for batch evaluation if the ``num_envs_per_worker`` config is set, or you can define a custom environment class that subclasses `VectorEnv `__ to implement ``vector_step()`` and ``vector_reset()``. -Note that auto-vectorization only applies to policy inference by default. This means that policy inference will be batched, but your envs will still be stepped one at a time. If you would like your envs to be stepped in parallel, you can set ``"remote_worker_envs": True``. This will create env instances in Ray actors and step them in parallel. These remote processes introduce communication overheads, so this only helps if your env is very expensive to step. +Note that auto-vectorization only applies to policy inference by default. This means that policy inference will be batched, but your envs will still be stepped one at a time. If you would like your envs to be stepped in parallel, you can set ``"remote_worker_envs": True`` or ``"async_remote_worker_envs": True``. This will create env instances in Ray actors and step them in parallel. These remote processes introduce communication overheads, so this only helps if your env is very expensive to step. Multi-Agent and Hierarchical ---------------------------- @@ -319,11 +319,9 @@ Note that envs can read from different partitions of the logs based on the ``wor .. seealso:: - `RLlib I/O `__ provides higher-level interfaces for working with offline experience datasets. + `Offline Datasets `__ provide higher-level interfaces for working with offline experience datasets. -Batch Asynchronous ------------------- +Advanced Integrations +--------------------- -The lowest-level "catch-all" environment supported by RLlib is `BaseEnv `__. BaseEnv models multiple agents executing asynchronously in multiple environments. A call to ``poll()`` returns observations from ready agents keyed by their environment and agent ids, and actions for those agents can be sent back via ``send_actions()``. This interface can be subclassed directly to support batched simulators such as `ELF `__. - -Under the hood, all other envs are converted to BaseEnv by RLlib so that there is a common internal path for policy evaluation. +For more complex / high-performance environment integrations, you can instead extend the low-level `BaseEnv `__ class. This low-level API models multiple agents executing asynchronously in multiple environments. A call to ``BaseEnv:poll()`` returns observations from ready agents keyed by their environment and agent ids, and actions for those agents are sent back via ``BaseEnv:send_actions()``. BaseEnv is used to implement all the other env types in RLlib, so it offers a superset of their functionality. For example, ``BaseEnv`` is used to implement dynamic batching of observations for inference over `multiple simulator actors `__. diff --git a/doc/source/rllib.rst b/doc/source/rllib.rst index a0fe8a0d4c58..6c662f957da9 100644 --- a/doc/source/rllib.rst +++ b/doc/source/rllib.rst @@ -40,7 +40,7 @@ Environments * `Vectorized `__ * `Multi-Agent and Hierarchical `__ * `Interfacing with External Agents `__ -* `Batch Asynchronous `__ +* `Advanced Integrations `__ Algorithms ---------- diff --git a/python/ray/rllib/agents/agent.py b/python/ray/rllib/agents/agent.py index 829c020f5ec6..235d0f704b1f 100644 --- a/python/ray/rllib/agents/agent.py +++ b/python/ray/rllib/agents/agent.py @@ -134,6 +134,9 @@ # remote processes instead of in the same worker. This adds overheads, but # can make sense if your envs are very CPU intensive (e.g., for StarCraft). "remote_worker_envs": False, + # Similar to remote_worker_envs, but runs the envs asynchronously in the + # background for greater efficiency. Conflicts with remote_worker_envs. + "async_remote_worker_envs": False, # === Offline Datasets === # __sphinx_doc_input_begin__ @@ -473,9 +476,7 @@ def make_local_evaluator(self, "tf_session_args": self. config["local_evaluator_tf_session_args"] }), - extra_config or {}), - remote_worker_envs=False, - ) + extra_config or {})) @DeveloperAPI def make_remote_evaluators(self, env_creator, policy_graph, count): @@ -490,14 +491,8 @@ def make_remote_evaluators(self, env_creator, policy_graph, count): cls = PolicyEvaluator.as_remote(**remote_args).remote return [ - self._make_evaluator( - cls, - env_creator, - policy_graph, - i + 1, - self.config, - remote_worker_envs=self.config["remote_worker_envs"]) - for i in range(count) + self._make_evaluator(cls, env_creator, policy_graph, i + 1, + self.config) for i in range(count) ] @DeveloperAPI @@ -563,13 +558,8 @@ def _validate_config(config): "`input_evaluation` must be a list of strings, got {}".format( config["input_evaluation"])) - def _make_evaluator(self, - cls, - env_creator, - policy_graph, - worker_index, - config, - remote_worker_envs=False): + def _make_evaluator(self, cls, env_creator, policy_graph, worker_index, + config): def session_creator(): logger.debug("Creating TF session {}".format( config["tf_session_args"])) @@ -639,7 +629,8 @@ def session_creator(): input_creator=input_creator, input_evaluation=input_evaluation, output_creator=output_creator, - remote_worker_envs=remote_worker_envs) + remote_worker_envs=config["remote_worker_envs"], + async_remote_worker_envs=config["async_remote_worker_envs"]) @override(Trainable) def _export_model(self, export_formats, export_dir): diff --git a/python/ray/rllib/env/base_env.py b/python/ray/rllib/env/base_env.py index 85993f05c862..7dd1921f131d 100644 --- a/python/ray/rllib/env/base_env.py +++ b/python/ray/rllib/env/base_env.py @@ -38,14 +38,22 @@ class BaseEnv(object): "env_0": { "car_0": [2.4, 1.6], "car_1": [3.4, -3.2], - } + }, + "env_1": { + "car_0": [8.0, 4.1], + }, + "env_2": { + "car_0": [2.3, 3.3], + "car_1": [1.4, -0.2], + "car_3": [1.2, 0.1], + }, } >>> env.send_actions( actions={ "env_0": { "car_0": 0, "car_1": 1, - } + }, ... }) >>> obs, rewards, dones, infos, off_policy_actions = env.poll() >>> print(obs) @@ -53,7 +61,7 @@ class BaseEnv(object): "env_0": { "car_0": [4.1, 1.7], "car_1": [3.2, -4.2], - } + }, ... } >>> print(dones) { @@ -61,25 +69,40 @@ class BaseEnv(object): "__all__": False, "car_0": False, "car_1": True, - } + }, ... } """ @staticmethod - def to_base_env(env, make_env=None, num_envs=1, remote_envs=False): + def to_base_env(env, + make_env=None, + num_envs=1, + remote_envs=False, + async_remote_envs=False): """Wraps any env type as needed to expose the async interface.""" - if remote_envs and num_envs == 1: + + from ray.rllib.env.remote_vector_env import RemoteVectorEnv + if (remote_envs or async_remote_envs) and num_envs == 1: raise ValueError( "Remote envs only make sense to use if num_envs > 1 " "(i.e. vectorization is enabled).") + if remote_envs and async_remote_envs: + raise ValueError("You can only specify one of remote_envs or " + "async_remote_envs.") + if not isinstance(env, BaseEnv): if isinstance(env, MultiAgentEnv): if remote_envs: - raise NotImplementedError( - "Remote multiagent environments are not implemented") - - env = _MultiAgentEnvToBaseEnv( - make_env=make_env, existing_envs=[env], num_envs=num_envs) + env = RemoteVectorEnv( + make_env, num_envs, multiagent=True, sync=True) + elif async_remote_envs: + env = RemoteVectorEnv( + make_env, num_envs, multiagent=True, sync=False) + else: + env = _MultiAgentEnvToBaseEnv( + make_env=make_env, + existing_envs=[env], + num_envs=num_envs) elif isinstance(env, ExternalEnv): if num_envs != 1: raise ValueError( @@ -88,15 +111,21 @@ def to_base_env(env, make_env=None, num_envs=1, remote_envs=False): elif isinstance(env, VectorEnv): env = _VectorEnvToBaseEnv(env) else: - env = VectorEnv.wrap( - make_env=make_env, - existing_envs=[env], - num_envs=num_envs, - remote_envs=remote_envs, - action_space=env.action_space, - observation_space=env.observation_space) - env = _VectorEnvToBaseEnv(env) - assert isinstance(env, BaseEnv) + if remote_envs: + env = RemoteVectorEnv( + make_env, num_envs, multiagent=False, sync=True) + elif async_remote_envs: + env = RemoteVectorEnv( + make_env, num_envs, multiagent=False, sync=False) + else: + env = VectorEnv.wrap( + make_env=make_env, + existing_envs=[env], + num_envs=num_envs, + action_space=env.action_space, + observation_space=env.observation_space) + env = _VectorEnvToBaseEnv(env) + assert isinstance(env, BaseEnv), env return env @PublicAPI diff --git a/python/ray/rllib/env/remote_vector_env.py b/python/ray/rllib/env/remote_vector_env.py new file mode 100644 index 000000000000..1f33a739b11e --- /dev/null +++ b/python/ray/rllib/env/remote_vector_env.py @@ -0,0 +1,118 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging + +import ray +from ray.rllib.env.base_env import BaseEnv, _DUMMY_AGENT_ID + +logger = logging.getLogger(__name__) + + +class RemoteVectorEnv(BaseEnv): + """Vector env that executes envs in remote workers. + + This provides dynamic batching of inference as observations are returned + from the remote simulator actors. Both single and multi-agent child envs + are supported, and envs can be stepped synchronously or async. + """ + + def __init__(self, make_env, num_envs, multiagent, sync): + self.make_local_env = make_env + if sync: + self.timeout = 9999999.0 # wait for all envs + else: + self.timeout = 0.0 # wait for only ready envs + + def make_remote_env(i): + logger.info("Launching env {} in remote actor".format(i)) + if multiagent: + return _RemoteMultiAgentEnv.remote(self.make_local_env, i) + else: + return _RemoteSingleAgentEnv.remote(self.make_local_env, i) + + self.actors = [make_remote_env(i) for i in range(num_envs)] + self.pending = None # lazy init + + def poll(self): + if self.pending is None: + self.pending = {a.reset.remote(): a for a in self.actors} + + # each keyed by env_id in [0, num_remote_envs) + obs, rewards, dones, infos = {}, {}, {}, {} + ready = [] + + # Wait for at least 1 env to be ready here + while not ready: + ready, _ = ray.wait( + list(self.pending), + num_returns=len(self.pending), + timeout=self.timeout) + + # Get and return observations for each of the ready envs + env_ids = set() + for obj_id in ready: + actor = self.pending.pop(obj_id) + env_id = self.actors.index(actor) + env_ids.add(env_id) + ob, rew, done, info = ray.get(obj_id) + obs[env_id] = ob + rewards[env_id] = rew + dones[env_id] = done + infos[env_id] = info + + logger.debug("Got obs batch for actors {}".format(env_ids)) + return obs, rewards, dones, infos, {} + + def send_actions(self, action_dict): + for env_id, actions in action_dict.items(): + actor = self.actors[env_id] + obj_id = actor.step.remote(actions) + self.pending[obj_id] = actor + + def try_reset(self, env_id): + obs, _, _, _ = ray.get(self.actors[env_id].reset.remote()) + return obs + + +@ray.remote(num_cpus=0) +class _RemoteMultiAgentEnv(object): + """Wrapper class for making a multi-agent env a remote actor.""" + + def __init__(self, make_env, i): + self.env = make_env(i) + + def reset(self): + obs = self.env.reset() + # each keyed by agent_id in the env + rew = {agent_id: 0 for agent_id in obs.keys()} + info = {agent_id: {} for agent_id in obs.keys()} + done = {"__all__": False} + return obs, rew, done, info + + def step(self, action_dict): + return self.env.step(action_dict) + + +@ray.remote(num_cpus=0) +class _RemoteSingleAgentEnv(object): + """Wrapper class for making a gym env a remote actor.""" + + def __init__(self, make_env, i): + self.env = make_env(i) + + def reset(self): + obs = {_DUMMY_AGENT_ID: self.env.reset()} + rew = {agent_id: 0 for agent_id in obs.keys()} + info = {agent_id: {} for agent_id in obs.keys()} + done = {"__all__": False} + return obs, rew, done, info + + def step(self, action): + obs, rew, done, info = self.env.step(action[_DUMMY_AGENT_ID]) + obs, rew, done, info = [{ + _DUMMY_AGENT_ID: x + } for x in [obs, rew, done, info]] + done["__all__"] = done[_DUMMY_AGENT_ID] + return obs, rew, done, info diff --git a/python/ray/rllib/env/vector_env.py b/python/ray/rllib/env/vector_env.py index e1de12375e5b..d0df24177f38 100644 --- a/python/ray/rllib/env/vector_env.py +++ b/python/ray/rllib/env/vector_env.py @@ -5,7 +5,6 @@ import logging import numpy as np -import ray from ray.rllib.utils.annotations import override, PublicAPI logger = logging.getLogger(__name__) @@ -27,12 +26,8 @@ class VectorEnv(object): def wrap(make_env=None, existing_envs=None, num_envs=1, - remote_envs=False, action_space=None, observation_space=None): - if remote_envs: - return _RemoteVectorizedGymEnv(make_env, num_envs, action_space, - observation_space) return _VectorizedGymEnv(make_env, existing_envs or [], num_envs, action_space, observation_space) @@ -129,71 +124,3 @@ def vector_step(self, actions): @override(VectorEnv) def get_unwrapped(self): return self.envs - - -@ray.remote(num_cpus=0) -class _RemoteEnv(object): - """Wrapper class for making a gym env a remote actor.""" - - def __init__(self, make_env, i): - self.env = make_env(i) - - def reset(self): - return self.env.reset() - - def step(self, action): - return self.env.step(action) - - -class _RemoteVectorizedGymEnv(_VectorizedGymEnv): - """Internal wrapper for gym envs to implement VectorEnv as remote workers. - """ - - def __init__(self, - make_env, - num_envs, - action_space=None, - observation_space=None): - self.make_local_env = make_env - self.num_envs = num_envs - self.initialized = False - self.action_space = action_space - self.observation_space = observation_space - - def _initialize_if_needed(self): - if self.initialized: - return - - self.initialized = True - - def make_remote_env(i): - logger.info("Launching env {} in remote actor".format(i)) - return _RemoteEnv.remote(self.make_local_env, i) - - _VectorizedGymEnv.__init__(self, make_remote_env, [], self.num_envs, - self.action_space, self.observation_space) - - for env in self.envs: - assert isinstance(env, ray.actor.ActorHandle), env - - @override(_VectorizedGymEnv) - def vector_reset(self): - self._initialize_if_needed() - return ray.get([env.reset.remote() for env in self.envs]) - - @override(_VectorizedGymEnv) - def reset_at(self, index): - return ray.get(self.envs[index].reset.remote()) - - @override(_VectorizedGymEnv) - def vector_step(self, actions): - step_outs = ray.get( - [env.step.remote(act) for env, act in zip(self.envs, actions)]) - - obs_batch, rew_batch, done_batch, info_batch = [], [], [], [] - for obs, rew, done, info in step_outs: - obs_batch.append(obs) - rew_batch.append(rew) - done_batch.append(done) - info_batch.append(info) - return obs_batch, rew_batch, done_batch, info_batch diff --git a/python/ray/rllib/evaluation/policy_evaluator.py b/python/ray/rllib/evaluation/policy_evaluator.py index 71e2009a1882..e83b6a209a46 100644 --- a/python/ray/rllib/evaluation/policy_evaluator.py +++ b/python/ray/rllib/evaluation/policy_evaluator.py @@ -122,7 +122,8 @@ def __init__(self, input_creator=lambda ioctx: ioctx.default_sampler_input(), input_evaluation=frozenset([]), output_creator=lambda ioctx: NoopOutput(), - remote_worker_envs=False): + remote_worker_envs=False, + async_remote_worker_envs=False): """Initialize a policy evaluator. Arguments: @@ -201,6 +202,8 @@ def __init__(self, those new envs in remote processes instead of in the current process. This adds overheads, but can make sense if your envs are very CPU intensive (e.g., for StarCraft). + async_remote_worker_envs (bool): Similar to remote_worker_envs, + but runs the envs asynchronously in the background. """ if log_level: @@ -307,7 +310,8 @@ def make_env(vector_index): self.env, make_env=make_env, num_envs=num_envs, - remote_envs=remote_worker_envs) + remote_envs=remote_worker_envs, + async_remote_envs=async_remote_worker_envs) self.num_envs = num_envs if self.batch_mode == "truncate_episodes": diff --git a/python/ray/rllib/tests/test_multi_agent_env.py b/python/ray/rllib/tests/test_multi_agent_env.py index 99f00ccaf434..6eeca3ef22a0 100644 --- a/python/ray/rllib/tests/test_multi_agent_env.py +++ b/python/ray/rllib/tests/test_multi_agent_env.py @@ -334,6 +334,38 @@ def testMultiAgentSample(self): self.assertEqual(batch.policy_batches["p0"]["t"].tolist(), list(range(25)) * 6) + def testMultiAgentSampleSyncRemote(self): + act_space = gym.spaces.Discrete(2) + obs_space = gym.spaces.Discrete(2) + ev = PolicyEvaluator( + env_creator=lambda _: BasicMultiAgent(5), + policy_graph={ + "p0": (MockPolicyGraph, obs_space, act_space, {}), + "p1": (MockPolicyGraph, obs_space, act_space, {}), + }, + policy_mapping_fn=lambda agent_id: "p{}".format(agent_id % 2), + batch_steps=50, + num_envs=4, + remote_worker_envs=True) + batch = ev.sample() + self.assertEqual(batch.count, 200) + + def testMultiAgentSampleAsyncRemote(self): + act_space = gym.spaces.Discrete(2) + obs_space = gym.spaces.Discrete(2) + ev = PolicyEvaluator( + env_creator=lambda _: BasicMultiAgent(5), + policy_graph={ + "p0": (MockPolicyGraph, obs_space, act_space, {}), + "p1": (MockPolicyGraph, obs_space, act_space, {}), + }, + policy_mapping_fn=lambda agent_id: "p{}".format(agent_id % 2), + batch_steps=50, + num_envs=4, + async_remote_worker_envs=True) + batch = ev.sample() + self.assertEqual(batch.count, 200) + def testMultiAgentSampleWithHorizon(self): act_space = gym.spaces.Discrete(2) obs_space = gym.spaces.Discrete(2) @@ -621,5 +653,5 @@ def testTrainMultiCartpoleManyPolicies(self): if __name__ == "__main__": - ray.init() + ray.init(num_cpus=4) unittest.main(verbosity=2)