-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathpg.py
116 lines (101 loc) · 3.77 KB
/
pg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import gym
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Reshape, Flatten
from keras.optimizers import Adam
from keras.layers.convolutional import Convolution2D
class PGAgent:
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.gamma = 0.99
self.learning_rate = 0.001
self.states = []
self.gradients = []
self.rewards = []
self.probs = []
self.model = self._build_model()
self.model.summary()
def _build_model(self):
model = Sequential()
model.add(Reshape((1, 80, 80), input_shape=(self.state_size,)))
model.add(Convolution2D(32, 6, 6, subsample=(3, 3), border_mode='same',
activation='relu', init='he_uniform'))
model.add(Flatten())
model.add(Dense(64, activation='relu', init='he_uniform'))
model.add(Dense(32, activation='relu', init='he_uniform'))
model.add(Dense(self.action_size, activation='softmax'))
opt = Adam(lr=self.learning_rate)
model.compile(loss='categorical_crossentropy', optimizer=opt)
return model
def memorize(self, state, action, prob, reward):
y = np.zeros([self.action_size])
y[action] = 1
self.gradients.append(np.array(y).astype('float32') - prob)
self.states.append(state)
self.rewards.append(reward)
def act(self, state):
state = state.reshape([1, state.shape[0]])
aprob = self.model.predict(state, batch_size=1).flatten()
self.probs.append(aprob)
prob = aprob / np.sum(aprob)
action = np.random.choice(self.action_size, 1, p=prob)[0]
return action, prob
def discount_rewards(self, rewards):
discounted_rewards = np.zeros_like(rewards)
running_add = 0
for t in reversed(range(0, rewards.size)):
if rewards[t] != 0:
running_add = 0
running_add = running_add * self.gamma + rewards[t]
discounted_rewards[t] = running_add
return discounted_rewards
def train(self):
gradients = np.vstack(self.gradients)
rewards = np.vstack(self.rewards)
rewards = self.discount_rewards(rewards)
reward = (reward - np.mean(rewards)) / (np.std(rewards) + 1e-7)
gradients *= rewards
X = np.squeeze(np.vstack([self.states]))
Y = self.probs + self.learning_rate * np.squeeze(np.vstack([gradients]))
self.model.train_on_batch(X, Y)
self.states, self.probs, self.gradients, self.rewards = [], [], [], []
def load(self, name):
self.model.load_weights(name)
def save(self, name):
self.model.save_weights(name)
def preprocess(I):
I = I[35:195]
I = I[::2, ::2, 0]
I[I == 144] = 0
I[I == 109] = 0
I[I != 0] = 1
return I.astype(np.float).ravel()
if __name__ == "__main__":
env = gym.make("Pong-v0")
state = env.reset()
prev_x = None
score = 0
episode = 0
state_size = 80 * 80
action_size = env.action_space.n
agent = PGAgent(state_size, action_size)
agent.load('pong.h5')
while True:
env.render()
cur_x = preprocess(state)
x = cur_x - prev_x if prev_x is not None else np.zeros(state_size)
prev_x = cur_x
action, prob = agent.act(x)
state, reward, done, info = env.step(action)
score += reward
agent.memorize(x, action, prob, reward)
if done:
episode += 1
agent.train()
print('Episode: %d - Score: %f.' % (episode, score))
score = 0
state = env.reset()
prev_x = None
if episode > 1 and episode % 10 == 0:
agent.save('pong.h5')