Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions doc/source/rllib-package-ref.rst
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
RLlib Package Reference
=======================

ray.rllib.agents
ray.rllib.policy
----------------

.. automodule:: ray.rllib.agents
.. automodule:: ray.rllib.policy
:members:

.. autoclass:: ray.rllib.agents.a3c.A2CTrainer
.. autoclass:: ray.rllib.agents.a3c.A3CTrainer
.. autoclass:: ray.rllib.agents.ddpg.ApexDDPGTrainer
.. autoclass:: ray.rllib.agents.ddpg.DDPGTrainer
.. autoclass:: ray.rllib.agents.dqn.ApexTrainer
.. autoclass:: ray.rllib.agents.dqn.DQNTrainer
.. autoclass:: ray.rllib.agents.es.ESTrainer
.. autoclass:: ray.rllib.agents.pg.PGTrainer
.. autoclass:: ray.rllib.agents.impala.ImpalaTrainer
.. autoclass:: ray.rllib.agents.ppo.APPOTrainer
.. autoclass:: ray.rllib.agents.ppo.PPOTrainer
.. autoclass:: ray.rllib.agents.marwil.MARWILTrainer


ray.rllib.env
-------------
Expand Down
27 changes: 5 additions & 22 deletions python/ray/rllib/agents/ddpg/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
from __future__ import division
from __future__ import print_function

from ray.rllib.agents.dqn.apex import APEX_TRAINER_PROPERTIES
from ray.rllib.agents.ddpg.ddpg import DDPGTrainer, \
DEFAULT_CONFIG as DDPG_CONFIG
from ray.rllib.utils.annotations import override
from ray.rllib.utils import merge_dicts

APEX_DDPG_DEFAULT_CONFIG = merge_dicts(
DDPG_CONFIG, # see also the options in ddpg.py, which are also supported
{
"optimizer_class": "AsyncReplayOptimizer",
"optimizer": merge_dicts(
DDPG_CONFIG["optimizer"], {
"max_weight_sync_delay": 400,
Expand All @@ -32,23 +31,7 @@
},
)


class ApexDDPGTrainer(DDPGTrainer):
"""DDPG variant that uses the Ape-X distributed policy optimizer.

By default, this is configured for a large single node (32 cores). For
running in a large cluster, increase the `num_workers` config var.
"""

_name = "APEX_DDPG"
_default_config = APEX_DDPG_DEFAULT_CONFIG

@override(DDPGTrainer)
def update_target_if_needed(self):
# Ape-X updates based on num steps trained, not sampled
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
self.config["target_network_update_freq"]:
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
self.last_target_update_ts = self.optimizer.num_steps_trained
self.num_target_updates += 1
ApexDDPGTrainer = DDPGTrainer.with_updates(
name="APEX_DDPG",
default_config=APEX_DDPG_DEFAULT_CONFIG,
**APEX_TRAINER_PROPERTIES)
109 changes: 63 additions & 46 deletions python/ray/rllib/agents/ddpg/ddpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from __future__ import print_function

from ray.rllib.agents.trainer import with_common_config
from ray.rllib.agents.dqn.dqn import DQNTrainer
from ray.rllib.agents.dqn.dqn import GenericOffPolicyTrainer, \
update_worker_explorations
from ray.rllib.agents.ddpg.ddpg_policy import DDPGTFPolicy
from ray.rllib.utils.annotations import override
from ray.rllib.utils.schedules import ConstantSchedule, LinearSchedule

# yapf: disable
Expand Down Expand Up @@ -97,6 +97,11 @@
# optimization on initial policy parameters. Note that this will be
# disabled when the action noise scale is set to 0 (e.g during evaluation).
"pure_exploration_steps": 1000,
# Extra configuration that disables exploration.
"evaluation_config": {
"exploration_fraction": 0,
"exploration_final_eps": 0,
},

# === Replay buffer ===
# Size of the replay buffer. Note that if async_updates is set, then
Expand All @@ -108,6 +113,11 @@
"prioritized_replay_alpha": 0.6,
# Beta parameter for sampling from prioritized replay buffer.
"prioritized_replay_beta": 0.4,
# Fraction of entire training period over which the beta parameter is
# annealed
"beta_annealing_fraction": 0.2,
# Final value of beta
"final_prioritized_replay_beta": 0.4,
# Epsilon to add to the TD errors when updating priorities.
"prioritized_replay_eps": 1e-6,
# Whether to LZ4 compress observations
Expand Down Expand Up @@ -146,8 +156,6 @@
# to increase if your environment is particularly slow to sample, or if
# you're using the Async or Ape-X optimizers.
"num_workers": 0,
# Optimizer class to use.
"optimizer_class": "SyncReplayOptimizer",
# Whether to use a distribution of epsilons across workers for exploration.
"per_worker_exploration": False,
# Whether to compute priorities on workers.
Expand All @@ -159,47 +167,56 @@
# yapf: enable


class DDPGTrainer(DQNTrainer):
"""DDPG implementation in TensorFlow."""
_name = "DDPG"
_default_config = DEFAULT_CONFIG
_policy = DDPGTFPolicy
def make_exploration_schedule(config, worker_index):
# Modification of DQN's schedule to take into account
# `exploration_ou_noise_scale`
if config["per_worker_exploration"]:
assert config["num_workers"] > 1, "This requires multiple workers"
if worker_index >= 0:
# FIXME: what do magic constants mean? (0.4, 7)
max_index = float(config["num_workers"] - 1)
exponent = 1 + worker_index / max_index * 7
return ConstantSchedule(0.4**exponent)
else:
# local ev should have zero exploration so that eval rollouts
# run properly
return ConstantSchedule(0.0)
elif config["exploration_should_anneal"]:
return LinearSchedule(
schedule_timesteps=int(config["exploration_fraction"] *
config["schedule_max_timesteps"]),
initial_p=1.0,
final_p=config["exploration_final_scale"])
else:
# *always* add exploration noise
return ConstantSchedule(1.0)


def setup_ddpg_exploration(trainer):
trainer.exploration0 = make_exploration_schedule(trainer.config, -1)
trainer.explorations = [
make_exploration_schedule(trainer.config, i)
for i in range(trainer.config["num_workers"])
]

@override(DQNTrainer)
def _train(self):
pure_expl_steps = self.config["pure_exploration_steps"]
if pure_expl_steps:
# tell workers whether they should do pure exploration
only_explore = self.global_timestep < pure_expl_steps
self.workers.local_worker().foreach_trainable_policy(

def add_pure_exploration_phase(trainer):
global_timestep = trainer.optimizer.num_steps_sampled
pure_expl_steps = trainer.config["pure_exploration_steps"]
if pure_expl_steps:
# tell workers whether they should do pure exploration
only_explore = global_timestep < pure_expl_steps
trainer.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.set_pure_exploration_phase(only_explore))
for e in trainer.workers.remote_workers():
e.foreach_trainable_policy.remote(
lambda p, _: p.set_pure_exploration_phase(only_explore))
for e in self.workers.remote_workers():
e.foreach_trainable_policy.remote(
lambda p, _: p.set_pure_exploration_phase(only_explore))
return super(DDPGTrainer, self)._train()

@override(DQNTrainer)
def _make_exploration_schedule(self, worker_index):
# Override DQN's schedule to take into account
# `exploration_ou_noise_scale`
if self.config["per_worker_exploration"]:
assert self.config["num_workers"] > 1, \
"This requires multiple workers"
if worker_index >= 0:
# FIXME: what do magic constants mean? (0.4, 7)
max_index = float(self.config["num_workers"] - 1)
exponent = 1 + worker_index / max_index * 7
return ConstantSchedule(0.4**exponent)
else:
# local ev should have zero exploration so that eval rollouts
# run properly
return ConstantSchedule(0.0)
elif self.config["exploration_should_anneal"]:
return LinearSchedule(
schedule_timesteps=int(self.config["exploration_fraction"] *
self.config["schedule_max_timesteps"]),
initial_p=1.0,
final_p=self.config["exploration_final_scale"])
else:
# *always* add exploration noise
return ConstantSchedule(1.0)
update_worker_explorations(trainer)


DDPGTrainer = GenericOffPolicyTrainer.with_updates(
name="DDPG",
default_config=DEFAULT_CONFIG,
default_policy=DDPGTFPolicy,
before_init=setup_ddpg_exploration,
before_train_step=add_pure_exploration_phase)
16 changes: 8 additions & 8 deletions python/ray/rllib/agents/ddpg/td3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
"""A more stable successor to TD3.

By default, this uses a near-identical configuration to that reported in the
TD3 paper.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
Expand Down Expand Up @@ -36,7 +42,6 @@
"train_batch_size": 100,
"use_huber": False,
"target_network_update_freq": 0,
"optimizer_class": "SyncReplayOptimizer",
"num_workers": 0,
"num_gpus_per_worker": 0,
"per_worker_exploration": False,
Expand All @@ -48,10 +53,5 @@
},
)


class TD3Trainer(DDPGTrainer):
"""A more stable successor to TD3. By default, this uses a near-identical
configuration to that reported in the TD3 paper."""

_name = "TD3"
_default_config = TD3_DEFAULT_CONFIG
TD3Trainer = DDPGTrainer.with_updates(
name="TD3", default_config=TD3_DEFAULT_CONFIG)
63 changes: 45 additions & 18 deletions python/ray/rllib/agents/dqn/apex.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
from __future__ import print_function

from ray.rllib.agents.dqn.dqn import DQNTrainer, DEFAULT_CONFIG as DQN_CONFIG
from ray.rllib.optimizers import AsyncReplayOptimizer
from ray.rllib.utils import merge_dicts
from ray.rllib.utils.annotations import override

# yapf: disable
# __sphinx_doc_begin__
APEX_DEFAULT_CONFIG = merge_dicts(
DQN_CONFIG, # see also the options in dqn.py, which are also supported
{
"optimizer_class": "AsyncReplayOptimizer",
"optimizer": merge_dicts(
DQN_CONFIG["optimizer"], {
"max_weight_sync_delay": 400,
Expand All @@ -36,22 +35,50 @@
# yapf: enable


class ApexTrainer(DQNTrainer):
"""DQN variant that uses the Ape-X distributed policy optimizer.
def defer_make_workers(trainer, env_creator, policy, config):
# Hack to workaround https://github.com/ray-project/ray/issues/2541
# The workers will be creatd later, after the optimizer is created
return trainer._make_workers(env_creator, policy, config, 0)

By default, this is configured for a large single node (32 cores). For
running in a large cluster, increase the `num_workers` config var.
"""

_name = "APEX"
_default_config = APEX_DEFAULT_CONFIG
def make_async_optimizer(workers, config):
assert len(workers.remote_workers()) == 0
extra_config = config["optimizer"].copy()
for key in [
"prioritized_replay", "prioritized_replay_alpha",
"prioritized_replay_beta", "prioritized_replay_eps"
]:
if key in config:
extra_config[key] = config[key]
opt = AsyncReplayOptimizer(
workers,
learning_starts=config["learning_starts"],
buffer_size=config["buffer_size"],
train_batch_size=config["train_batch_size"],
sample_batch_size=config["sample_batch_size"],
**extra_config)
workers.add_workers(config["num_workers"])
opt._set_workers(workers.remote_workers())
return opt

@override(DQNTrainer)
def update_target_if_needed(self):
# Ape-X updates based on num steps trained, not sampled
if self.optimizer.num_steps_trained - self.last_target_update_ts > \
self.config["target_network_update_freq"]:
self.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
self.last_target_update_ts = self.optimizer.num_steps_trained
self.num_target_updates += 1

def update_target_based_on_num_steps_trained(trainer, fetches):
# Ape-X updates based on num steps trained, not sampled
if (trainer.optimizer.num_steps_trained -
trainer.state["last_target_update_ts"] >
trainer.config["target_network_update_freq"]):
trainer.workers.local_worker().foreach_trainable_policy(
lambda p, _: p.update_target())
trainer.state["last_target_update_ts"] = (
trainer.optimizer.num_steps_trained)
trainer.state["num_target_updates"] += 1


APEX_TRAINER_PROPERTIES = {
"make_workers": defer_make_workers,
"make_policy_optimizer": make_async_optimizer,
"after_optimizer_step": update_target_based_on_num_steps_trained,
}

ApexTrainer = DQNTrainer.with_updates(
name="APEX", default_config=APEX_DEFAULT_CONFIG, **APEX_TRAINER_PROPERTIES)
Loading