Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized environments #1513

Merged
merged 29 commits into from
Jun 21, 2019
Merged

Conversation

tristandeleu
Copy link
Contributor

@tristandeleu tristandeleu commented Jun 2, 2019

One of the most useful feature in openai/baselines is the ability to run multiple environments in parallel with the SubprocVecEnv wrapper. However in order to benefit from this feature, it is necessary to install the whole baselines package, which includes some heavy dependencies like Tensorflow. To avoid this dependency, starting a new project often requires to copy/paste the snippet of code for SubprocVecEnv. I am proposing to add this functionality in Gym, so that this feature can be used directly out of the box.

TL;DR

import gym
env = gym.vector.make('CartPole-v1', num_envs=5)
observations = env.reset()
# array([[-0.04071225, -0.00272771,  0.04585911,  0.03397448],
#        [ 0.04807664, -0.00466015,  0.03836329, -0.04897803],
#        [ 0.03999503,  0.03953767, -0.02474664, -0.00865895],
#        [-0.03199483,  0.01612429, -0.03704762, -0.02390875],
#        [ 0.00967298,  0.01544605,  0.04391582,  0.0040252 ]],
#       dtype=float32)
  • Adding a base class VectorEnv, inheriting from gym.Env. This ensures that the vectorized environments are still valid instances of gym.Env. In particular, the step method still returns a tuple (observations, rewards, dones, infos). However, the difference is that rewards and dones are now arrays (instead of scalars).
  • Adding SyncVectorEnv and AsyncVectorEnv. Roughly speaking, SyncVectorEnv is equivalent to DummyVecEnv, and AsyncVectorEnv is a unified API for both SubprocVecEnv and ShmemVecEnv.
    • SyncVectorEnv runs the environments serially.
    • AsyncVectorEnv uses multiprocessing, and runs the environments in parallel.
  • AsyncVectorEnv has the possibility to use shared memory across processes, to improve the efficiency of data transfer if observations are large (e.g. images). It supports all types of Gym spaces, including Dict and Tuple.
  • AsyncVectorEnv includes a low level control over the reset and step operations. In particular, these methods include an optional timeout argument, to shutdown the processes after a certain period, to avoid any hanging process.
  • The convenience function gym.vector.make() is the equivalent of gym.make(), with multiple copies of the environment wrapped in a vectorized environment.
import gym

env = gym.vector.make('BreakoutDeterministic-v4', num_envs=8)
env.seed(0)
observations = env.reset()
# observations.shape: (8, 210, 160, 3)

actions = env.action_space.sample()
observations, rewards, dones, infos = env.step(actions)
# rewards: array([0., 0., 0., 0., 0., 0., 0., 0.])
# dones: array([False, False, False, False, False, False, False, False])

EDIT:

  • Added max_retries argument in AsyncVectorEnv. This automatically restarts processes that failed, up to a certain amount of maximum retries.

Bonus:

import gym

env = gym.vector.make('HandReach-v0', num_envs=5)
env.observation_space
# Dict(achieved_goal:Box(5, 15), desired_goal:Box(5, 15), observation:Box(5, 63))
observations = env.reset()

observations.keys()
# odict_keys(['achieved_goal', 'desired_goal', 'observation'])
observations['observation'].shape
# (5, 63)

@zuoxingdong
Copy link
Contributor

@tristandeleu That looks really great ! One potential issue with proposed API to me is that what if the user would like to wrap the "atomic" environment at first before wrapping them into a vectorized environments ?

@tristandeleu
Copy link
Contributor Author

Thank you! The gym.vector.make function is meant for very simple cases, where you don't want to wrap the atomic environments. If you want to use wrappers around the environments, you can still use the lower level SyncVectorEnv and AsyncVectorEnv. For example:

import gym
from gym.vector import AsyncVectorEnv
from gym.wrappers import AtariPreprocessing

def make_env(env_id):
    def _make():
        env = gym.make(env_id)
        env = AtariPreprocessing(env)
        return env
    return _make

env_fns = [make_env('BreakoutDeterministic-v4') for _ in range(5)]
env = AsyncVectorEnv(env_fns)
observations = env.reset()
# observations.shape: (5, 84, 84)

@tristandeleu
Copy link
Contributor Author

tristandeleu commented Jun 6, 2019

I added the episodic argument to both SyncVectorEnv and AsyncVectorEnv to override the default behavior of calling reset after an environment finishes an episode. The episodic argument is also available in the gym.vector.make function. This allows you to do something like

import gym
import numpy as np

env = gym.vector.make('CartPole-v1', num_envs=8, episodic=True)
observations = env.reset()
dones = np.zeros(env.num_envs, dtype=np.bool_)

while not dones.all():
    actions = env.action_space.sample()
    observations, rewards, dones, infos = env.step(actions)

@pzhokhov
Copy link
Collaborator

pzhokhov commented Jun 7, 2019

@tristandeleu Thanks, this looks great! In fact we have bounced this idea internally for a while, great to see the initiative taken by the community :) Given the size of the PR, however, I'd like to take more time to read through it carefully; also putting @christopherhesse into the loop.

@christopherhesse
Copy link
Contributor

Having a standard vectorized environment in gym would be really nice. I'll write a few comments on this PR.


__all__ = ['AsyncVectorEnv', 'SyncVectorEnv', 'VectorEnv', 'make']

def make(id, num_envs=1, asynchronous=True, episodic=False, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is episodic=True used? I can't reset individual environments, so I'd have to reset them all at once right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, episodic=True means that the individual environments do not reset automatically after finishing an episode. You get only a single episode of each environment, and you are responsible for resetting the environment yourself. This is meant to cover the frequent pattern

while not done
    env.step(action)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does that cover that pattern? Seems like I want episodic=True in that case or else I'll step past the end of some environments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in that case you'd like episodic=True. Here is an example

import gym
import numpy as np

env = gym.vector.make('CubeCrash-v0', 5, episodic=True)
observations = env.reset()
dones = np.zeros(env.num_envs, dtype=np.bool_)
# This terminates after getting 1 episode per environment
while not dones.all():
    actions = env.action_space.sample()
    observations, rewards, dones, _ = env.step(actions)

env = gym.vector.make('CubeCrash-v0', 5, episodic=False)
observations = env.reset()
dones = np.zeros(env.num_envs, dtype=np.bool_)
# This doesn't terminate
while not dones.all():
    actions = env.action_space.sample()
    observations, rewards, dones, _ = env.step(actions)

This allows you to either

  • Get a fixed amount of episodes. Let's say you have 5 environments and you want 20 episodes overall, with episodic=True you can just loop 4 times through this (and you are responsible for calling reset):
env = gym.vector.make('CubeCrash-v0', 5, episodic=True)
for _ in range(4):
    observations = env.reset()
    dones = np.zeros(env.num_envs, dtype=np.bool_)

    while not dones.all():
        actions = env.action_space.sample()
        observations, rewards, dones, _ = env.step(actions)
  • Or get an unbounded amount of episodes, without having to reset the environment yourself. This is the behavior you get in baselines, and is the default one here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a little dangerous since it's easy to confuse the past-done "undefined" observations with the good ones in this case. What we normally do is if you want 20 episodes you do episodic=False and then count the number of dones (done_count += np.sum(dones) for instance). Any thoughts @pzhokhov ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The episodic=True seems dangerous for me as well (left a separate comment in the code about that); while I can see the cases where it is useful, I think proper implementation will require env-specific details (like what is an ok observation after done, what is in the info dictionary).

return True
return False

def close_extras(self, timeout=None, terminate=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing subprocvecenv doesn't seem to need the terminate or timeout arguments. While I could see an argument for the timeout one, why add terminate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, what's the argument for timeout as well? We haven't needed it in baselines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind timeout is to leave close the possibility to close the individual environment gracefully, but doesn't have to wait for too long (for example, if something went wrong in one of the individual environment that leaves it hanging forever). terminate is the extreme version of it, where the user wants to close the environment without having to wait at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, that wasn't a very clear question. I realize why they're there, it's just I haven't used an environment where they were necessary. It seems like something that would be handled by a wrapper instead of the environment vectorizer. Do you have a case where this is actually used?

Copy link
Contributor Author

@tristandeleu tristandeleu Jun 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I never had to use it, but I feel like leaving the user the ability to have a finer control over this if they need to might be a good idea. I think it makes sense to have it in the base class for AsyncVectorEnv, especially for handling termination of processes, but I'm completely open to a wrapper-based solution if you have an idea. The default behavior is the same as in baselines.

setup.py Outdated Show resolved Hide resolved
@christopherhesse
Copy link
Contributor

Overall, this looks good, it simplifies some things but adds some complexity over the baselines implementation and it's not clear that the extra complexity is necessary.

@christopherhesse
Copy link
Contributor

Also it looks like the existing tests did not make it in this PR: https://github.com/openai/baselines/blob/master/baselines/common/vec_env/test_vec_env.py

It would be nice to have those here as well.

@tristandeleu
Copy link
Contributor Author

I have removed the episodic and automatic restart features from the current PR, and have them ready for future PRs once this one is merged.

@christopherhesse
Copy link
Contributor

Awesome, thanks @tristandeleu! It looks to me that the only remaining item is to separate call_wait/call_async/call/get_attr/set_attr into a separate PR. Sorry again for the long cycle here, but it's a rather large PR. If everything on this PR looks good on Friday, hopefully we can merge it then.

The docstrings and tests look great btw.

@tristandeleu
Copy link
Contributor Author

I removed call/get_attr/set_attr from the current PR, and this is also ready for another PR once this one is merged.

@pzhokhov
Copy link
Collaborator

LGTM! Thanks for the all the efforts, this looks very nice!

@pzhokhov pzhokhov merged commit c6a97e1 into openai:master Jun 21, 2019
@christopherhesse
Copy link
Contributor

Thanks again @tristandeleu, having vectorized envs in gym is nice, and the cleanup over what baselines did is also good.

@christopherhesse
Copy link
Contributor

I filed some issues I found with this: https://github.com/openai/gym/labels/VectorEnv

zlig pushed a commit to zlig/gym that referenced this pull request Apr 24, 2020
* Initial version of vectorized environments

* Raise an exception in the main process if child process raises an exception

* Add list of exposed functions in vector module

* Use deepcopy instead of np.copy

* Add documentation for vector utils

* Add tests for copy in AsyncVectorEnv

* Add example in documentation for batch_space

* Add cloudpickle dependency in setup.py

* Fix __del__ in VectorEnv

* Check if all observation spaces are equal in AsyncVectorEnv

* Check if all observation spaces are equal in SyncVectorEnv

* Fix spaces non equality in SyncVectorEnv for Python 2

* Handle None parameter in create_empty_array

* Fix check_observation_space with spaces equality

* Raise an exception when operations are out of order in AsyncVectorEnv

* Add version requirement for cloudpickle

* Use a state instead of binary flags in AsyncVectorEnv

* Use numpy.zeros when initializing observations in vectorized environments

* Remove poll from public API in AsyncVectorEnv

* Remove close_extras from VectorEnv

* Add test between AsyncVectorEnv and SyncVectorEnv

* Remove close in check_observation_space

* Add documentation for seed and close

* Refactor exceptions for AsyncVectorEnv

* Close pipes if the environment raises an error

* Add tests for out of order operations

* Change default argument in create_empty_array to np.zeros

* Add get_attr and set_attr methods to VectorEnv

* Improve consistency in SyncVectorEnv
@tristandeleu tristandeleu mentioned this pull request Dec 15, 2021
13 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants