Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
682ae7e
wip
ericl May 27, 2018
846a3a6
cls
ericl May 27, 2018
a5e1416
re
ericl May 27, 2018
cfb77be
wip
ericl May 28, 2018
7966e63
Merge branch 'fix-classmethod' into v2-refactor
ericl May 28, 2018
3c07c29
wip
ericl May 28, 2018
332683c
a3c working
ericl May 28, 2018
3cea2c9
torch support
ericl May 28, 2018
d7472e5
pg works
ericl May 28, 2018
b4a782b
lint
ericl May 28, 2018
8738fa3
rm v2
ericl May 28, 2018
a88957c
consumer id
ericl May 28, 2018
370abf0
clean up pg
ericl May 28, 2018
6c2bcbb
clean up more
ericl May 28, 2018
56429fb
fix python 2.7
ericl May 28, 2018
2380c8f
Merge branch 'fix-classmethod' into v2-refactor
ericl May 28, 2018
f16f8f0
tf session management
ericl May 28, 2018
71d78b5
docs
ericl May 28, 2018
5ab8723
dqn wip
ericl May 29, 2018
c6d68ff
fix compile
ericl May 29, 2018
fa015ff
dqn
ericl May 29, 2018
e2a41a9
apex runs
ericl May 29, 2018
84624fe
up
ericl May 29, 2018
3c4a9fd
impotrs
ericl May 29, 2018
c56dcef
ddpg
ericl May 29, 2018
04220bf
quotes
ericl May 29, 2018
6f5ef1b
Merge remote-tracking branch 'upstream/master' into v2-refactor
ericl May 29, 2018
95a69df
fix tests
ericl May 29, 2018
c62a236
fix last r
ericl May 29, 2018
a9090a4
fix tests
ericl May 29, 2018
a63efae
lint
ericl May 29, 2018
19db8bd
pass checkpoint restore
ericl May 29, 2018
c2b4243
kwar
ericl May 29, 2018
0e56fd4
nits
ericl May 30, 2018
ed0b359
policy graph
ericl May 30, 2018
70ea79d
fix yapf
ericl May 30, 2018
496946f
com
ericl May 30, 2018
53b4e55
class
ericl May 30, 2018
da02fa9
Merge remote-tracking branch 'upstream/master' into v2-refactor
ericl May 30, 2018
3657108
pyt
ericl May 30, 2018
1f435f7
update
ericl Jun 7, 2018
6dbd0e8
Merge remote-tracking branch 'upstream/master' into v2-refactor
ericl Jun 7, 2018
f910464
test cpe
ericl Jun 7, 2018
5685e32
unit test
ericl Jun 7, 2018
f2af5dc
fix ddpg2
ericl Jun 7, 2018
ad9a205
args
ericl Jun 8, 2018
1b9b192
faster test
ericl Jun 8, 2018
21cecdd
common
ericl Jun 8, 2018
b4ef184
updates
ericl Jun 9, 2018
80577c8
updates
ericl Jun 9, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions python/ray/rllib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
# This file is imported from the tune module in order to register RLlib agents.
from ray.tune.registry import register_trainable

from ray.rllib.utils.policy_graph import PolicyGraph
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator
from ray.rllib.optimizers.sample_batch import SampleBatch


def _register_all():
for key in ["PPO", "ES", "DQN", "APEX", "A3C", "BC", "PG", "DDPG",
Expand All @@ -16,3 +21,7 @@ def _register_all():


_register_all()

__all__ = [
"PolicyGraph", "TFPolicyGraph", "CommonPolicyEvaluator", "SampleBatch"
]
102 changes: 49 additions & 53 deletions python/ray/rllib/a3c/a3c.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@
from __future__ import division
from __future__ import print_function

import numpy as np
import pickle
import os

import ray
from ray.rllib.agent import Agent
from ray.rllib.optimizers import AsyncOptimizer
from ray.rllib.utils import FilterManager
from ray.rllib.a3c.a3c_evaluator import A3CEvaluator, RemoteA3CEvaluator, \
GPURemoteA3CEvaluator
from ray.tune.result import TrainingResult
from ray.rllib.utils.common_policy_evaluator import CommonPolicyEvaluator, \
collect_metrics
from ray.rllib.a3c.common import get_policy_cls
from ray.tune.trial import Resources

DEFAULT_CONFIG = {
# Number of workers (excluding master)
"num_workers": 4,
"num_workers": 2,
# Size of rollout batch
"batch_size": 10,
# Use LSTM model - only applicable for image states
Expand All @@ -42,6 +41,8 @@
"entropy_coeff": -0.01,
# Whether to place workers on GPUs
"use_gpu_for_workers": False,
# Whether to emit extra summary stats
"summarize": False,
# Model and preprocessor options
"model": {
# (Image statespace) - Converts image to Channels = 1
Expand Down Expand Up @@ -78,56 +79,48 @@ def default_resource_request(cls, config):
extra_gpu=cf["use_gpu_for_workers"] and cf["num_workers"] or 0)

def _init(self):
self.local_evaluator = A3CEvaluator(
self.registry,
self.env_creator,
self.config,
self.logdir,
start_sampler=False)
if self.config["use_gpu_for_workers"]:
remote_cls = GPURemoteA3CEvaluator
self.policy_cls = get_policy_cls(self.config)

if self.config["use_pytorch"]:
session_creator = None
else:
remote_cls = RemoteA3CEvaluator
import tensorflow as tf

def session_creator():
return tf.Session(
config=tf.ConfigProto(
intra_op_parallelism_threads=1,
inter_op_parallelism_threads=1,
gpu_options=tf.GPUOptions(allow_growth=True)))

remote_cls = CommonPolicyEvaluator.as_remote(
num_gpus=1 if self.config["use_gpu_for_workers"] else 0)
self.local_evaluator = CommonPolicyEvaluator(
self.env_creator, self.policy_cls,
batch_steps=self.config["batch_size"],
batch_mode="truncate_episodes",
tf_session_creator=session_creator,
registry=self.registry, env_config=self.config["env_config"],
model_config=self.config["model"], policy_config=self.config)
self.remote_evaluators = [
remote_cls.remote(self.registry, self.env_creator, self.config,
self.logdir)
for i in range(self.config["num_workers"])
]
self.optimizer = AsyncOptimizer(self.config["optimizer"],
self.local_evaluator,
self.remote_evaluators)
remote_cls.remote(
self.env_creator, self.policy_cls,
batch_steps=self.config["batch_size"],
batch_mode="truncate_episodes", sample_async=True,
tf_session_creator=session_creator,
registry=self.registry, env_config=self.config["env_config"],
model_config=self.config["model"], policy_config=self.config)
for i in range(self.config["num_workers"])]

self.optimizer = AsyncOptimizer(
self.config["optimizer"], self.local_evaluator,
self.remote_evaluators)

def _train(self):
self.optimizer.step()
FilterManager.synchronize(self.local_evaluator.filters,
self.remote_evaluators)
res = self._fetch_metrics_from_remote_evaluators()
return res

def _fetch_metrics_from_remote_evaluators(self):
episode_rewards = []
episode_lengths = []
metric_lists = [
a.get_completed_rollout_metrics.remote()
for a in self.remote_evaluators
]
for metrics in metric_lists:
for episode in ray.get(metrics):
episode_lengths.append(episode.episode_length)
episode_rewards.append(episode.episode_reward)
avg_reward = (np.mean(episode_rewards)
if episode_rewards else float('nan'))
avg_length = (np.mean(episode_lengths)
if episode_lengths else float('nan'))
timesteps = np.sum(episode_lengths) if episode_lengths else 0

result = TrainingResult(
episode_reward_mean=avg_reward,
episode_len_mean=avg_length,
timesteps_this_iter=timesteps,
info={})

return result
FilterManager.synchronize(
self.local_evaluator.filters, self.remote_evaluators)
return collect_metrics(self.local_evaluator, self.remote_evaluators)

def _stop(self):
# workaround for https://github.com/ray-project/ray/issues/1516
Expand All @@ -154,7 +147,10 @@ def _restore(self, checkpoint_path):
])
self.local_evaluator.restore(extra_data["local_state"])

def compute_action(self, observation):
def compute_action(self, observation, state=None):
if state is None:
state = []
obs = self.local_evaluator.obs_filter(observation, update=False)
action, info = self.local_evaluator.policy.compute(obs)
return action
return self.local_evaluator.for_policy(
lambda p: p.compute_single_action(
obs, state, is_training=False)[0])
119 changes: 0 additions & 119 deletions python/ray/rllib/a3c/a3c_evaluator.py

This file was deleted.

103 changes: 103 additions & 0 deletions python/ray/rllib/a3c/a3c_tf_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import tensorflow as tf
import gym

from ray.rllib.utils.error import UnsupportedSpaceException
from ray.rllib.utils.process_rollout import compute_advantages
from ray.rllib.utils.tf_policy_graph import TFPolicyGraph


class A3CTFPolicyGraph(TFPolicyGraph):
"""The TF policy base class."""

def __init__(self, ob_space, action_space, registry, config):
self.registry = registry
self.local_steps = 0
self.config = config
self.summarize = config.get("summarize")

self._setup_graph(ob_space, action_space)
assert all(hasattr(self, attr)
for attr in ["vf", "logits", "x", "var_list"])
print("Setting up loss")
self.setup_loss(action_space)
self.is_training = tf.placeholder_with_default(True, ())
self.sess = tf.get_default_session()

TFPolicyGraph.__init__(
self, self.sess, obs_input=self.x,
action_sampler=self.action_dist.sample(), loss=self.loss,
loss_inputs=self.loss_in, is_training=self.is_training,
state_inputs=self.state_in, state_outputs=self.state_out)

self.sess.run(tf.global_variables_initializer())

if self.summarize:
bs = tf.to_float(tf.shape(self.x)[0])
tf.summary.scalar("model/policy_graph", self.pi_loss / bs)
tf.summary.scalar("model/value_loss", self.vf_loss / bs)
tf.summary.scalar("model/entropy", self.entropy / bs)
tf.summary.scalar("model/grad_gnorm", tf.global_norm(self._grads))
tf.summary.scalar("model/var_gnorm", tf.global_norm(self.var_list))
self.summary_op = tf.summary.merge_all()

def _setup_graph(self, ob_space, ac_space):
raise NotImplementedError

def setup_loss(self, action_space):
if isinstance(action_space, gym.spaces.Box):
ac_size = action_space.shape[0]
self.ac = tf.placeholder(tf.float32, [None, ac_size], name="ac")
elif isinstance(action_space, gym.spaces.Discrete):
self.ac = tf.placeholder(tf.int64, [None], name="ac")
else:
raise UnsupportedSpaceException(
"Action space {} is not supported for A3C.".format(
action_space))
self.adv = tf.placeholder(tf.float32, [None], name="adv")
self.r = tf.placeholder(tf.float32, [None], name="r")

log_prob = self.action_dist.logp(self.ac)

# The "policy gradients" loss: its derivative is precisely the policy
# gradient. Notice that self.ac is a placeholder that is provided
# externally. adv will contain the advantages, as calculated in
# compute_advantages.
self.pi_loss = - tf.reduce_sum(log_prob * self.adv)

delta = self.vf - self.r
self.vf_loss = 0.5 * tf.reduce_sum(tf.square(delta))
self.entropy = tf.reduce_sum(self.action_dist.entropy())
self.loss = (self.pi_loss +
self.vf_loss * self.config["vf_loss_coeff"] +
self.entropy * self.config["entropy_coeff"])

def optimizer(self):
return tf.train.AdamOptimizer(self.config["lr"])

def gradients(self, optimizer):
grads = tf.gradients(self.loss, self.var_list)
self.grads, _ = tf.clip_by_global_norm(grads, self.config["grad_clip"])
clipped_grads = list(zip(self.grads, self.var_list))
return clipped_grads

def extra_compute_grad_fetches(self):
if self.summarize:
return {"summary": self.summary_op}
else:
return {}

def postprocess_trajectory(self, sample_batch, other_agent_batches=None):
completed = sample_batch["dones"][-1]
if completed:
last_r = 0.0
else:
next_state = []
for i in range(len(self.state_in)):
next_state.append([sample_batch["state_out_{}".format(i)][-1]])
last_r = self.value(sample_batch["new_obs"][-1], *next_state)
return compute_advantages(
sample_batch, last_r, self.config["gamma"], self.config["lambda"])
Loading