Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/source/rllib-models.rst
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ In this example we overrode existing methods of the existing DDPG policy graph,
Variable-length / Parametric Action Spaces
------------------------------------------

Custom models can be used to work with environments where (1) the set of valid actions varies per step, and/or (2) the number of valid actions is very large, as in `OpenAI Five <https://neuro.cs.ut.ee/the-use-of-embeddings-in-openai-five/>`__ and `Horizon <https://arxiv.org/abs/1811.00260>`__. The general idea is that the meaning of actions can be completely conditioned on the observation, that is, the ``a`` in ``Q(s, a)`` is just a token in ``[0, MAX_AVAIL_ACTIONS)`` that only has meaning in the context of ``s``. This works with algorithms in the `DQN and policy-gradient families <rllib-env.html>`__ and can be implemented as follows:
Custom models can be used to work with environments where (1) the set of valid actions varies per step, and/or (2) the number of valid actions is very large, as in `OpenAI Five <https://neuro.cs.ut.ee/the-use-of-embeddings-in-openai-five/>`__ and `Horizon <https://arxiv.org/abs/1811.00260>`__. The general idea is that the meaning of actions can be completely conditioned on the observation, i.e., the ``a`` in ``Q(s, a)`` becomes just a token in ``[0, MAX_AVAIL_ACTIONS)`` that only has meaning in the context of ``s``. This works with algorithms in the `DQN and policy-gradient families <rllib-env.html>`__ and can be implemented as follows:

1. The environment should return a mask and/or list of valid action embeddings as part of the observation for each step. To enable batching, the number of actions can be allowed to vary from 1 to some max number:

Expand Down
3 changes: 3 additions & 0 deletions python/ray/rllib/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
# Whether to clip rewards prior to experience postprocessing. Setting to
# None means clip for Atari only.
"clip_rewards": None,
# Whether to np.clip() actions to the action space low/high range spec.
"clip_actions": True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when would you want this to be false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backwards compatibility I think it's a reasonable option to have.

# Whether to use rllib or deepmind preprocessors by default
"preprocessor_pref": "deepmind",

Expand Down Expand Up @@ -226,6 +228,7 @@ def session_creator():
num_envs=config["num_envs_per_worker"],
observation_filter=config["observation_filter"],
clip_rewards=config["clip_rewards"],
clip_actions=config["clip_actions"],
env_config=config["env_config"],
model_config=config["model"],
policy_config=config,
Expand Down
9 changes: 7 additions & 2 deletions python/ray/rllib/evaluation/policy_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(self,
num_envs=1,
observation_filter="NoFilter",
clip_rewards=None,
clip_actions=True,
env_config=None,
model_config=None,
policy_config=None,
Expand Down Expand Up @@ -155,6 +156,8 @@ def __init__(self,
clip_rewards (bool): Whether to clip rewards to [-1, 1] prior to
experience postprocessing. Setting to None means clip for Atari
only.
clip_actions (bool): Whether to clip action values to the range
specified by the policy action space.
env_config (dict): Config to pass to the env creator.
model_config (dict): Config to use when creating the policy model.
policy_config (dict): Config to pass to the policy. In the
Expand Down Expand Up @@ -289,7 +292,8 @@ def make_env(vector_index):
self.callbacks,
horizon=episode_horizon,
pack=pack_episodes,
tf_sess=self.tf_sess)
tf_sess=self.tf_sess,
clip_actions=clip_actions)
self.sampler.start()
else:
self.sampler = SyncSampler(
Expand All @@ -302,7 +306,8 @@ def make_env(vector_index):
self.callbacks,
horizon=episode_horizon,
pack=pack_episodes,
tf_sess=self.tf_sess)
tf_sess=self.tf_sess,
clip_actions=clip_actions)

logger.debug("Created evaluator with env {} ({}), policies {}".format(
self.async_env, self.env, self.policy_map))
Expand Down
53 changes: 46 additions & 7 deletions python/ray/rllib/evaluation/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import division
from __future__ import print_function

import gym
from collections import defaultdict, namedtuple
import logging
import numpy as np
Expand Down Expand Up @@ -47,7 +48,8 @@ def __init__(self,
callbacks,
horizon=None,
pack=False,
tf_sess=None):
tf_sess=None,
clip_actions=True):
self.async_vector_env = AsyncVectorEnv.wrap_async(env)
self.unroll_length = unroll_length
self.horizon = horizon
Expand All @@ -58,7 +60,8 @@ def __init__(self,
self.rollout_provider = _env_runner(
self.async_vector_env, self.extra_batches.put, self.policies,
self.policy_mapping_fn, self.unroll_length, self.horizon,
self._obs_filters, clip_rewards, pack, callbacks, tf_sess)
self._obs_filters, clip_rewards, clip_actions, pack, callbacks,
tf_sess)
self.metrics_queue = queue.Queue()

def get_data(self):
Expand Down Expand Up @@ -104,7 +107,8 @@ def __init__(self,
callbacks,
horizon=None,
pack=False,
tf_sess=None):
tf_sess=None,
clip_actions=True):
for _, f in obs_filters.items():
assert getattr(f, "is_concurrent", False), \
"Observation Filter must support concurrent updates."
Expand All @@ -123,6 +127,7 @@ def __init__(self,
self.pack = pack
self.tf_sess = tf_sess
self.callbacks = callbacks
self.clip_actions = clip_actions

def run(self):
try:
Expand All @@ -135,8 +140,8 @@ def _run(self):
rollout_provider = _env_runner(
self.async_vector_env, self.extra_batches.put, self.policies,
self.policy_mapping_fn, self.unroll_length, self.horizon,
self._obs_filters, self.clip_rewards, self.pack, self.callbacks,
self.tf_sess)
self._obs_filters, self.clip_rewards, self.clip_actions, self.pack,
self.callbacks, self.tf_sess)
while True:
# The timeout variable exists because apparently, if one worker
# dies, the other workers won't die with it, unless the timeout is
Expand Down Expand Up @@ -197,6 +202,7 @@ def _env_runner(async_vector_env,
horizon,
obs_filters,
clip_rewards,
clip_actions,
pack,
callbacks,
tf_sess=None):
Expand All @@ -217,6 +223,7 @@ def _env_runner(async_vector_env,
clip_rewards (bool): Whether to clip rewards before postprocessing.
pack (bool): Whether to pack multiple episodes into each batch. This
guarantees batches will be exactly `unroll_length` in size.
clip_actions (bool): Whether to clip actions to the space range.
callbacks (dict): User callbacks to run on episode events.
tf_sess (Session|None): Optional tensorflow session to use for batching
TF policy evaluations.
Expand Down Expand Up @@ -272,7 +279,7 @@ def new_episode():

# Do batched policy eval
eval_results = _do_policy_eval(tf_sess, to_eval, policies,
active_episodes)
active_episodes, clip_actions)

# Process results and update episode state
actions_to_send = _process_policy_eval_results(
Expand Down Expand Up @@ -413,7 +420,7 @@ def _process_observations(async_vector_env, policies, batch_builder_pool,
return active_envs, to_eval, outputs


def _do_policy_eval(tf_sess, to_eval, policies, active_episodes):
def _do_policy_eval(tf_sess, to_eval, policies, active_episodes, clip_actions):
"""Call compute actions on observation batches to get next actions.

Returns:
Expand Down Expand Up @@ -448,6 +455,13 @@ def _do_policy_eval(tf_sess, to_eval, policies, active_episodes):
for k, v in pending_fetches.items():
eval_results[k] = builder.get(v)

if clip_actions:
for policy_id, results in eval_results.items():
policy = _get_or_raise(policies, policy_id)
actions, rnn_out_cols, pi_info_cols = results
eval_results[policy_id] = (_clip_actions(
actions, policy.action_space), rnn_out_cols, pi_info_cols)

return eval_results


Expand Down Expand Up @@ -516,6 +530,31 @@ def _fetch_atari_metrics(async_vector_env):
return atari_out


def _clip_actions(actions, space):
"""Called to clip actions to the specified range of this policy.

Arguments:
actions: Batch of actions or TupleActions.
space: Action space the actions should be present in.

Returns:
Clipped batch of actions.
"""

if isinstance(space, gym.spaces.Box):
return np.clip(actions, space.low, space.high)
elif isinstance(space, gym.spaces.Tuple):
if not isinstance(actions, TupleActions):
raise ValueError("Expected tuple space for actions {}: {}".format(
actions, space))
out = []
for a, s in zip(actions.batches, space.spaces):
out.append(_clip_actions(a, s))
return TupleActions(out)
else:
return actions


def _unbatch_tuple_actions(action_batch):
# convert list of batches -> batch of lists
if isinstance(action_batch, TupleActions):
Expand Down
44 changes: 2 additions & 42 deletions python/ray/rllib/models/action_dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,10 @@ class DiagGaussian(ActionDistribution):
second half the gaussian standard deviations.
"""

def __init__(self, inputs, low=None, high=None):
def __init__(self, inputs):
ActionDistribution.__init__(self, inputs)
mean, log_std = tf.split(inputs, 2, axis=1)
self.mean = mean
self.low = low
self.high = high

# Squash to range if specified. We use a sigmoid here this to avoid the
# mean drifting too far past the bounds and causing nan outputs.
# https://github.com/ray-project/ray/issues/1862
if low is not None:
self.mean = low + tf.sigmoid(self.mean) * (high - low)

self.log_std = log_std
self.std = tf.exp(log_std)

Expand All @@ -131,10 +122,7 @@ def entropy(self):
reduction_indices=[1])

def sample(self):
out = self.mean + self.std * tf.random_normal(tf.shape(self.mean))
if self.low is not None:
out = tf.clip_by_value(out, self.low, self.high)
return out
return self.mean + self.std * tf.random_normal(tf.shape(self.mean))


class Deterministic(ActionDistribution):
Expand All @@ -147,34 +135,6 @@ def sample(self):
return self.inputs


def squash_to_range(dist_cls, low, high):
"""Squashes an action distribution to a range in (low, high).

Arguments:
dist_cls (class): ActionDistribution class to wrap.
low (float|array): Scalar value or array of values.
high (float|array): Scalar value or array of values.
"""

class SquashToRangeWrapper(dist_cls):
def __init__(self, inputs):
dist_cls.__init__(self, inputs, low=low, high=high)

def logp(self, x):
return dist_cls.logp(self, x)

def kl(self, other):
return dist_cls.kl(self, other)

def entropy(self):
return dist_cls.entropy(self)

def sample(self):
return dist_cls.sample(self)

return SquashToRangeWrapper


class MultiActionDistribution(ActionDistribution):
"""Action distribution that operates for list of actions.

Expand Down
10 changes: 5 additions & 5 deletions python/ray/rllib/models/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
from ray.rllib.env.external_env import ExternalEnv
from ray.rllib.env.vector_env import VectorEnv
from ray.rllib.models.action_dist import (
Categorical, Deterministic, DiagGaussian, MultiActionDistribution,
squash_to_range)
Categorical, Deterministic, DiagGaussian, MultiActionDistribution)
from ray.rllib.models.preprocessors import get_preprocessor
from ray.rllib.models.fcnet import FullyConnectedNetwork
from ray.rllib.models.visionnet import VisionNetwork
Expand All @@ -38,7 +37,7 @@
"fcnet_hiddens": [256, 256],
# For control envs, documented in ray.rllib.models.Model
"free_log_std": False,
# Whether to squash the action output to space range
# (deprecated) Whether to use sigmoid to squash actions to space range
"squash_to_range": False,

# == LSTM ==
Expand Down Expand Up @@ -114,8 +113,9 @@ def get_action_dist(action_space, config, dist_type=None):
if dist_type is None:
dist = DiagGaussian
if config.get("squash_to_range"):
dist = squash_to_range(dist, action_space.low,
action_space.high)
raise ValueError(
"The squash_to_range option is deprecated. See the "
"clip_actions agent option instead.")
return dist, action_space.shape[0] * 2
elif dist_type == "deterministic":
return Deterministic, action_space.shape[0]
Expand Down
26 changes: 16 additions & 10 deletions python/ray/rllib/test/test_supported_spaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,22 +120,22 @@ def testAll(self):
stats,
check_bounds=True)
check_support("DQN", {"timesteps_per_iteration": 1}, stats)
check_support("A3C", {
"num_workers": 1,
"optimizer": {
"grads_per_step": 1
}
}, stats)
check_support(
"A3C", {
"num_workers": 1,
"optimizer": {
"grads_per_step": 1
}
},
stats,
check_bounds=True)
check_support(
"PPO", {
"num_workers": 1,
"num_sgd_iter": 1,
"train_batch_size": 10,
"sample_batch_size": 10,
"sgd_minibatch_size": 1,
"model": {
"squash_to_range": True
},
},
stats,
check_bounds=True)
Expand All @@ -153,7 +153,13 @@ def testAll(self):
"num_rollouts": 1,
"rollouts_used": 1
}, stats)
check_support("PG", {"num_workers": 1, "optimizer": {}}, stats)
check_support(
"PG", {
"num_workers": 1,
"optimizer": {}
},
stats,
check_bounds=True)
num_unexpected_errors = 0
for (alg, a_name, o_name), stat in sorted(stats.items()):
if stat not in ["ok", "unsupported"]:
Expand Down
33 changes: 21 additions & 12 deletions python/ray/rllib/tuned_examples/pong-ppo.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
# On a Tesla K80 GPU, this achieves the maximum reward in about 1-1.5 hours.
# On a single GPU, this achieves maximum reward in ~15-20 minutes.
#
# $ python train.py -f tuned_examples/pong-ppo.yaml --ray-num-gpus=1
# $ python train.py -f tuned_examples/pong-ppo.yaml
#
# - PPO_PongDeterministic-v4_0: TERMINATED [pid=16387], 4984 s, 1117981 ts, 21 rew
# - PPO_PongDeterministic-v4_0: TERMINATED [pid=83606], 4592 s, 1068671 ts, 21 rew
#
pong-deterministic-ppo:
env: PongDeterministic-v4
pong-ppo:
env: PongNoFrameskip-v4
run: PPO
stop:
episode_reward_mean: 21
config:
gamma: 0.99
num_workers: 4
num_sgd_iter: 20
lambda: 0.95
kl_coeff: 0.5
clip_rewards: True
clip_param: 0.1
vf_clip_param: 10.0
entropy_coeff: 0.01
train_batch_size: 5000
sample_batch_size: 20
sgd_minibatch_size: 500
num_sgd_iter: 10
num_workers: 32
num_envs_per_worker: 5
batch_mode: truncate_episodes
observation_filter: NoFilter
vf_share_layers: true
num_gpus: 1
model:
dim: 42