-
Notifications
You must be signed in to change notification settings - Fork 3
/
03-rl-multiprocessing.py
138 lines (101 loc) · 3.94 KB
/
03-rl-multiprocessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# Solving reinforcement learning problems using pgpelib with parallelization
# ==========================================================================
#
# In this example, we are going to solve the `CartPole-v1` environment, and
# we will also use parallelization to decrease the execution time of the
# evolution.
#
# `pgpelib` is agnostic when it comes to parallelization: the choice of
# parallelization library is left to the user.
#
# In the case of this example, we use `multiprocessing`.
from pgpelib import PGPE
from pgpelib.policies import LinearPolicy, MLPPolicy
from pgpelib.restore import to_torch_module
import numpy as np
import torch
import gym
import multiprocessing as mp
ENV_NAME = 'CartPole-v1'
policy = MLPPolicy(
env_name=ENV_NAME, # Name of the environment
num_hidden=1, # Number of hidden layers
hidden_size=8, # Size of a hidden layer
hidden_activation='tanh', # Activation function of the hidden layer
# Whether or not to do online normalization on the observations
# received from the environments.
# In this tutorial, we set it to False just to keep things simple.
# Note that, with observation_normalization, we would need to
# synchronize the observation stats between the main process and the
# worker processes.
observation_normalization=False
)
def evaluate_solution(solution: np.ndarray):
global policy
fitness, _ = policy.set_params_and_run(solution)
return fitness
def main():
# Initial solution
x0 = np.zeros(policy.get_parameters_count(), dtype='float32')
# Below we initialize our PGPE solver.
pgpe = PGPE(
solution_length=policy.get_parameters_count(),
popsize=250,
center_init=x0,
center_learning_rate=0.075,
optimizer='clipup',
optimizer_config={'max_speed': 0.15},
stdev_init=0.08,
stdev_learning_rate=0.1,
stdev_max_change=0.2,
solution_ranking=True,
dtype='float32'
)
# Here, we make a pool of worker processes.
# With the help of these workers, we aim to parallelize the
# evaluation of the solutions.
with mp.Pool(processes=mp.cpu_count()) as pool:
num_iterations = 50
# The main loop of the evolutionary computation
for i in range(1, 1 + num_iterations):
# Get the solutions from the pgpe solver
solutions = pgpe.ask()
# Evaluate the solutions in parallel and get the fitnesses
fitnesses = pool.map(evaluate_solution, solutions)
# Send the pgpe solver the received fitnesses
pgpe.tell(fitnesses)
print("Iteration:", i, " median score:", np.median(fitnesses))
print("Visualizing the center solution...")
# Get the center solution
center_solution = pgpe.center.copy()
# Make the gym environment for visualizing the center solution
env = gym.make(ENV_NAME)
# Convert the center solution to a PyTorch module
policy.set_parameters(center_solution)
net = to_torch_module(policy)
cumulative_reward = 0.0
# Reset the environment, and get the observation of the initial
# state into a variable.
observation = env.reset()
# Visualize the initial state
env.render()
# Main loop of the trajectory
while True:
with torch.no_grad():
action = net(
torch.as_tensor(observation, dtype=torch.float32)
).numpy()
if isinstance(env.action_space, gym.spaces.Box):
interaction = action
elif isinstance(env.action_space, gym.spaces.Discrete):
interaction = int(np.argmax(action))
else:
assert False, "Unknown action space"
observation, reward, done, info = env.step(interaction)
env.render()
cumulative_reward += reward
if done:
break
print("cumulative_reward", cumulative_reward)
if __name__ == "__main__":
main()