-
Notifications
You must be signed in to change notification settings - Fork 13
/
mlp_policy.py
109 lines (78 loc) · 4.54 KB
/
mlp_policy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from baselines.common.mpi_running_mean_std import RunningMeanStd
import baselines.common.tf_util as U
import tensorflow as tf
import gym
from baselines.common.distributions import make_pdtype
import numpy as np
import pdb
def dense3D2(x, size, name, option, num_options=1, weight_init=None, bias=True):
w = tf.get_variable(name + "/w", [num_options, x.get_shape()[1], size], initializer=weight_init)
ret = tf.matmul(x, w[option[0]])
if bias:
b = tf.get_variable(name + "/b", [num_options,size], initializer=tf.zeros_initializer())
return ret + b[option[0]]
else:
return ret
class MlpPolicy(object):
recurrent = False
def __init__(self, name, *args, **kwargs):
with tf.variable_scope(name):
self._init(*args, **kwargs)
self.scope = tf.get_variable_scope().name
def _init(self, ob_space, ac_space, hid_size, num_hid_layers, gaussian_fixed_var=True, num_options=2,dc=0):
assert isinstance(ob_space, gym.spaces.Box)
self.dc = dc
self.num_options = num_options
self.pdtype = pdtype = make_pdtype(ac_space)
sequence_length = None
ob = U.get_placeholder(name="ob", dtype=tf.float32, shape=[sequence_length] + list(ob_space.shape))
option = U.get_placeholder(name="option", dtype=tf.int32, shape=[None])
with tf.variable_scope("obfilter"):
self.ob_rms = RunningMeanStd(shape=ob_space.shape)
obz = tf.clip_by_value((ob - self.ob_rms.mean) / self.ob_rms.std, -5.0, 5.0)
last_out = obz
for i in range(num_hid_layers):
last_out = tf.nn.tanh(U.dense(last_out, hid_size, "vffc%i"%(i+1), weight_init=U.normc_initializer(1.0)))
self.vpred = dense3D2(last_out, 1, "vffinal", option, num_options=num_options, weight_init=U.normc_initializer(1.0))[:,0]
self.tpred = tf.nn.sigmoid(dense3D2(tf.stop_gradient(last_out), 1, "termhead", option, num_options=num_options, weight_init=U.normc_initializer(1.0)))[:,0]
termination_sample = tf.greater(self.tpred, tf.random_uniform(shape=tf.shape(self.tpred),maxval=1.))
last_out = obz
for i in range(num_hid_layers):
last_out = tf.nn.tanh(U.dense(last_out, hid_size, "polfc%i"%(i+1), weight_init=U.normc_initializer(1.0)))
if gaussian_fixed_var and isinstance(ac_space, gym.spaces.Box):
mean = dense3D2(last_out, pdtype.param_shape()[0]//2, "polfinal", option, num_options=num_options, weight_init=U.normc_initializer(0.01))
logstd = tf.get_variable(name="logstd", shape=[num_options, 1, pdtype.param_shape()[0]//2], initializer=tf.zeros_initializer())
pdparam = U.concatenate([mean, mean * 0.0 + logstd[option[0]]], axis=1)
else:
pdparam = U.dense(last_out, pdtype.param_shape()[0], "polfinal", U.normc_initializer(0.01))
self.op_pi = tf.nn.softmax(U.dense(tf.stop_gradient(last_out), num_options, "OPfc%i"%(i+1), weight_init=U.normc_initializer(1.0)))
self.pd = pdtype.pdfromflat(pdparam)
self.state_in = []
self.state_out = []
stochastic = tf.placeholder(dtype=tf.bool, shape=())
ac = U.switch(stochastic, self.pd.sample(), self.pd.mode())
self._act = U.function([stochastic, ob, option], [ac, self.vpred, last_out, logstd])
self._get_v = U.function([ob, option], [self.vpred])
self.get_term = U.function([ob, option], [termination_sample])
self.get_tpred = U.function([ob, option], [self.tpred])
self.get_vpred = U.function([ob, option], [self.vpred])
self._get_op = U.function([ob], [self.op_pi])
def act(self, stochastic, ob, option):
ac1, vpred1, feats, logstd = self._act(stochastic, ob[None], [option])
return ac1[0], vpred1[0], feats[0], logstd[option][0]
def get_option(self,ob):
op_prob = self._get_op([ob])[0][0]
return np.random.choice(range(len(op_prob)), p=op_prob)
def get_term_adv(self, ob, curr_opt):
vals = []
for opt in range(self.num_options):
vals.append(self._get_v(ob,[opt])[0])
vals=np.array(vals)
op_prob = self._get_op(ob)[0].transpose()
return (vals[curr_opt[0]] - np.sum((op_prob*vals),axis=0) + self.dc), ( vals[curr_opt[0]] - np.sum((op_prob*vals),axis=0) )
def get_variables(self):
return tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, self.scope)
def get_trainable_variables(self):
return tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, self.scope)
def get_initial_state(self):
return []