forked from ageron/handson-ml3
-
Notifications
You must be signed in to change notification settings - Fork 25
/
18_reinforcement_learning.qmd
1282 lines (1000 loc) · 53.5 KB
/
18_reinforcement_learning.qmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
---
title: Setup
jupyter: python3
---
**Chapter 18 – Reinforcement Learning**
_This notebook contains all the sample code and solutions to the exercises in chapter 18._
<table align="left">
<td>
<a href="https://colab.research.google.com/github/ageron/handson-ml3/blob/main/18_reinforcement_learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
</td>
<td>
<a target="_blank" href="https://kaggle.com/kernels/welcome?src=https://github.com/ageron/handson-ml3/blob/main/18_reinforcement_learning.ipynb"><img src="https://kaggle.com/static/images/open-in-kaggle.svg" /></a>
</td>
</table>
This project requires Python 3.7 or above:
```{python}
#| id: TFSU3FCOpKzu
#| id: TFSU3FCOpKzu
import sys
assert sys.version_info >= (3, 7)
```
And TensorFlow ≥ 2.8:
```{python}
#| id: 0Piq5se2pKzx
#| id: 0Piq5se2pKzx
from packaging import version
import tensorflow as tf
assert version.parse(tf.__version__) >= version.parse("2.8.0")
```
As we did in earlier chapters, let's define the default font sizes to make the figures prettier. We will also display some Matplotlib animations, and there are several possible options to do that: we will use the Javascript option.
```{python}
#| id: 8d4TH3NbpKzx
#| id: 8d4TH3NbpKzx
import matplotlib.animation
import matplotlib.pyplot as plt
plt.rc('font', size=14)
plt.rc('axes', labelsize=14, titlesize=14)
plt.rc('legend', fontsize=14)
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=10)
plt.rc('animation', html='jshtml')
```
And let's create the `images/rl` folder (if it doesn't already exist), and define the `save_fig()` function which is used through this notebook to save the figures in high-res for the book:
```{python}
#| id: PQFH5Y9PpKzy
#| id: PQFH5Y9PpKzy
from pathlib import Path
IMAGES_PATH = Path() / "images" / "rl"
IMAGES_PATH.mkdir(parents=True, exist_ok=True)
def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
path = IMAGES_PATH / f"{fig_id}.{fig_extension}"
if tight_layout:
plt.tight_layout()
plt.savefig(path, format=fig_extension, dpi=resolution)
```
This chapter can be very slow without a GPU, so let's make sure there's one, or else issue a warning:
```{python}
#| id: Ekxzo6pOpKzy
#| id: Ekxzo6pOpKzy
if not tf.config.list_physical_devices('GPU'):
print("No GPU was detected. Neural nets can be very slow without a GPU.")
if "google.colab" in sys.modules:
print("Go to Runtime > Change runtime and select a GPU hardware "
"accelerator.")
if "kaggle_secrets" in sys.modules:
print("Go to Settings > Accelerator and select GPU.")
```
Let's install the gym library, which provides many environments for Reinforcement Learning. We'll also install the extra libraries needed for classic control environments (including CartPole, which we will use shortly), as well as for Box2D and Atari environments, which are needed for the exercises.
**Note:** by running the following cell, you accept the Atari ROM license.
```{python}
if "google.colab" in sys.modules or "kaggle_secrets" in sys.modules:
%pip install -q -U gym
%pip install -q -U gym[classic_control,box2d,atari,accept-rom-license]
```
# Introduction to OpenAI gym
In this notebook we will be using [OpenAI gym](https://gym.openai.com/), a great toolkit for developing and comparing Reinforcement Learning algorithms. It provides many environments for your learning *agents* to interact with. Let's import Gym and make a new CartPole environment:
```{python}
import gym
env = gym.make("CartPole-v1", render_mode="rgb_array")
```
The CartPole (version 1) is a very simple environment composed of a cart that can move left or right, and pole placed vertically on top of it. The agent must move the cart left or right to keep the pole upright.
**Tip**: `gym.envs.registry` is a dictionary containing all available environments:
```{python}
# extra code – shows the first few environments
envs = gym.envs.registry
sorted(envs.keys())[:5] + ["..."]
```
The registry values are environment specifications:
```{python}
# extra code – shows the specification for the CartPole-v1 environment
envs["CartPole-v1"]
```
Let's initialize the environment by calling is `reset()` method. This returns an observation, as well as a dictionary that may contain extra information. Both are environment-specific.
```{python}
obs, info = env.reset(seed=42)
obs
```
```{python}
info
```
In the case of the CartPole, each observation is a 1D NumPy array composed of 4 floats: they represent the cart's horizontal position, its velocity, the angle of the pole (0 = vertical), and the angular velocity.
An environment can be visualized by calling its `render()` method. If you set `render_mode` to `"rgb_array"` when creating the environment, then this will return a NumPy array.
```{python}
img = env.render()
img.shape # height, width, channels (3 = Red, Green, Blue)
```
```{python}
# extra code – creates a little function to render and plot an environment
def plot_environment(env, figsize=(5, 4)):
plt.figure(figsize=figsize)
img = env.render()
plt.imshow(img)
plt.axis("off")
return img
plot_environment(env)
plt.show()
```
Let's see how to interact with an environment. Your agent will need to select an action from an "action space" (the set of possible actions). Let's see what this environment's action space looks like:
```{python}
env.action_space
```
Yep, just two possible actions: accelerate towards the left (0) or towards the right (1).
Since the pole is leaning toward the right (`obs[2] > 0`), let's accelerate the cart toward the right:
```{python}
action = 1 # accelerate right
obs, reward, done, truncated, info = env.step(action)
obs
```
Notice that the cart is now moving toward the right (`obs[1] > 0`). The pole is still tilted toward the right (`obs[2] > 0`), but its angular velocity is now negative (`obs[3] < 0`), so it will likely be tilted toward the left after the next step.
```{python}
# extra code – displays the environment
plot_environment(env)
save_fig("cart_pole_plot")
plt.show()
```
Looks like it's doing what we're telling it to do!
The environment also tells the agent how much reward it got during the last step:
```{python}
reward
```
When the game is over, the environment returns `done=True`. In this case, it's not over yet:
```{python}
done
```
Some environment wrappers may want to interrupt the environment early. For example, when a time limit is reached or when an object goes out of bounds. In this case, `truncated` will be set to `True`. In this case, it's not truncated yet:
```{python}
truncated
```
Finally, `info` is an environment-specific dictionary that can provide some extra information that you may find useful for debugging or for training. For example, in some games it may indicate how many lives the agent has.
```{python}
info
```
The sequence of steps between the moment the environment is reset until it is done or truncated is called an "episode". At the end of an episode (i.e., when `step()` returns `done=True` or `truncated=True`), you should reset the environment before you continue to use it.
```{python}
if done or truncated:
obs, info = env.reset()
```
Now how can we make the poll remain upright? We will need to define a _policy_ for that. This is the strategy that the agent will use to select an action at each step. It can use all the past actions and observations to decide what to do.
# A simple hard-coded policy
Let's hard code a simple strategy: if the pole is tilting to the left, then push the cart to the left, and _vice versa_. Let's see if that works:
```{python}
def basic_policy(obs):
angle = obs[2]
return 0 if angle < 0 else 1
totals = []
for episode in range(500):
episode_rewards = 0
obs, info = env.reset(seed=episode)
for step in range(200):
action = basic_policy(obs)
obs, reward, done, truncated, info = env.step(action)
episode_rewards += reward
if done or truncated:
break
totals.append(episode_rewards)
```
```{python}
import numpy as np
np.mean(totals), np.std(totals), min(totals), max(totals)
```
Well, as expected, this strategy is a bit too basic: the best it did was to keep the poll up for only 63 steps. This environment is considered solved when the agent keeps the poll up for 200 steps.
Let's visualize one episode. You can learn more about Matplotlib animations in the [Matplotlib tutorial notebook](tools_matplotlib.ipynb#Animations).
```{python}
# extra code – this cell displays an animation of one episode
def update_scene(num, frames, patch):
patch.set_data(frames[num])
return patch,
def plot_animation(frames, repeat=False, interval=40):
fig = plt.figure()
patch = plt.imshow(frames[0])
plt.axis('off')
anim = matplotlib.animation.FuncAnimation(
fig, update_scene, fargs=(frames, patch),
frames=len(frames), repeat=repeat, interval=interval)
plt.close()
return anim
def show_one_episode(policy, n_max_steps=200, seed=42):
frames = []
env = gym.make("CartPole-v1", render_mode="rgb_array")
np.random.seed(seed)
obs, info = env.reset(seed=seed)
for step in range(n_max_steps):
frames.append(env.render())
action = policy(obs)
obs, reward, done, truncated, info = env.step(action)
if done or truncated:
break
env.close()
return plot_animation(frames)
show_one_episode(basic_policy)
```
Clearly the system is unstable and after just a few wobbles, the pole ends up too tilted: game over. We will need to be smarter than that!
# Neural Network Policies
Let's create a neural network that will take observations as inputs, and output the probabilities of actions to take for each observation. To choose an action, the network will estimate a probability for each action, then we will select an action randomly according to the estimated probabilities. In the case of the CartPole environment, there are just two possible actions (left or right), so we only need one output neuron: it will output the probability `p` of the action 0 (left), and of course the probability of action 1 (right) will be `1 - p`.
```{python}
import tensorflow as tf
tf.random.set_seed(42) # extra code – ensures reproducibility on the CPU
model = tf.keras.Sequential([
tf.keras.layers.Dense(5, activation="relu"),
tf.keras.layers.Dense(1, activation="sigmoid"),
])
```
In this particular environment, the past actions and observations can safely be ignored, since each observation contains the environment's full state. If there were some hidden state then you may need to consider past actions and observations in order to try to infer the hidden state of the environment. For example, if the environment only revealed the position of the cart but not its velocity, you would have to consider not only the current observation but also the previous observation in order to estimate the current velocity. Another example is if the observations are noisy: you may want to use the past few observations to estimate the most likely current state. Our problem is thus as simple as can be: the current observation is noise-free and contains the environment's full state.
You may wonder why we plan to pick a random action based on the probability given by the policy network, rather than just picking the action with the highest probability. This approach lets the agent find the right balance between _exploring_ new actions and _exploiting_ the actions that are known to work well. Here's an analogy: suppose you go to a restaurant for the first time, and all the dishes look equally appealing so you randomly pick one. If it turns out to be good, you can increase the probability to order it next time, but you shouldn't increase that probability to 100%, or else you will never try out the other dishes, some of which may be even better than the one you tried.
Let's write a small policy function that will use the neural net to get the probability of moving left, then let's use it to run one episode:
```{python}
# extra code – a function that creates an animation for a given policy model
def pg_policy(obs):
left_proba = model.predict(obs[np.newaxis], verbose=0)
return int(np.random.rand() > left_proba)
np.random.seed(42)
show_one_episode(pg_policy)
```
Yeah... pretty bad. The neural network will have to learn to do better. First let's see if it is capable of learning the basic policy we used earlier: go left if the pole is tilting left, and go right if it is tilting right.
Let's see if it can learn a better policy on its own. One that does not wobble as much.
# Policy Gradients
To train this neural network we will need to define the target probabilities **y**. If an action is good we should increase its probability, and conversely if it is bad we should reduce it. But how do we know whether an action is good or bad? The problem is that most actions have delayed effects, so when you win or lose points in an episode, it is not clear which actions contributed to this result: was it just the last action? Or the last 10? Or just one action 50 steps earlier? This is called the _credit assignment problem_.
The _Policy Gradients_ algorithm tackles this problem by first playing multiple episodes, then making the actions near positive rewards slightly more likely, while actions near negative rewards are made slightly less likely. First we play, then we go back and think about what we did.
Let's start by creating a function to play a single step using the model. We will also pretend for now that whatever action it takes is the right one, so we can compute the loss and its gradients. We will just save these gradients for now, and modify them later depending on how good or bad the action turned out to be.
```{python}
def play_one_step(env, obs, model, loss_fn):
with tf.GradientTape() as tape:
left_proba = model(obs[np.newaxis])
action = (tf.random.uniform([1, 1]) > left_proba)
y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
loss = tf.reduce_mean(loss_fn(y_target, left_proba))
grads = tape.gradient(loss, model.trainable_variables)
obs, reward, done, truncated, info = env.step(int(action))
return obs, reward, done, truncated, grads
```
If `left_proba` is high, then `action` will most likely be `False` (since a random number uniformally sampled between 0 and 1 will probably not be greater than `left_proba`). And `False` means 0 when you cast it to a number, so `y_target` would be equal to 1 - 0 = 1. In other words, we set the target to 1, meaning we pretend that the probability of going left should have been 100% (so we took the right action).
Now let's create another function that will rely on the `play_one_step()` function to play multiple episodes, returning all the rewards and gradients, for each episode and each step:
```{python}
def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
all_rewards = []
all_grads = []
for episode in range(n_episodes):
current_rewards = []
current_grads = []
obs, info = env.reset()
for step in range(n_max_steps):
obs, reward, done, truncated, grads = play_one_step(
env, obs, model, loss_fn)
current_rewards.append(reward)
current_grads.append(grads)
if done or truncated:
break
all_rewards.append(current_rewards)
all_grads.append(current_grads)
return all_rewards, all_grads
```
The Policy Gradients algorithm uses the model to play the episode several times (e.g., 10 times), then it goes back and looks at all the rewards, discounts them and normalizes them. So let's create couple functions for that: the first will compute discounted rewards; the second will normalize the discounted rewards across many episodes.
```{python}
def discount_rewards(rewards, discount_factor):
discounted = np.array(rewards)
for step in range(len(rewards) - 2, -1, -1):
discounted[step] += discounted[step + 1] * discount_factor
return discounted
def discount_and_normalize_rewards(all_rewards, discount_factor):
all_discounted_rewards = [discount_rewards(rewards, discount_factor)
for rewards in all_rewards]
flat_rewards = np.concatenate(all_discounted_rewards)
reward_mean = flat_rewards.mean()
reward_std = flat_rewards.std()
return [(discounted_rewards - reward_mean) / reward_std
for discounted_rewards in all_discounted_rewards]
```
Say there were 3 actions, and after each action there was a reward: first 10, then 0, then -50. If we use a discount factor of 80%, then the 3rd action will get -50 (full credit for the last reward), but the 2nd action will only get -40 (80% credit for the last reward), and the 1st action will get 80% of -40 (-32) plus full credit for the first reward (+10), which leads to a discounted reward of -22:
```{python}
discount_rewards([10, 0, -50], discount_factor=0.8)
```
To normalize all discounted rewards across all episodes, we compute the mean and standard deviation of all the discounted rewards, and we subtract the mean from each discounted reward, and divide by the standard deviation:
```{python}
#| scrolled: true
discount_and_normalize_rewards([[10, 0, -50], [10, 20]],
discount_factor=0.8)
```
```{python}
n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95
```
```{python}
# extra code – let's create the neural net and reset the environment, for
# reproducibility
tf.random.set_seed(42)
model = tf.keras.Sequential([
tf.keras.layers.Dense(5, activation="relu"),
tf.keras.layers.Dense(1, activation="sigmoid"),
])
obs, info = env.reset(seed=42)
```
```{python}
optimizer = tf.keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = tf.keras.losses.binary_crossentropy
```
```{python}
for iteration in range(n_iterations):
all_rewards, all_grads = play_multiple_episodes(
env, n_episodes_per_update, n_max_steps, model, loss_fn)
# extra code – displays some debug info during training
total_rewards = sum(map(sum, all_rewards))
print(f"\rIteration: {iteration + 1}/{n_iterations},"
f" mean rewards: {total_rewards / n_episodes_per_update:.1f}", end="")
all_final_rewards = discount_and_normalize_rewards(all_rewards,
discount_factor)
all_mean_grads = []
for var_index in range(len(model.trainable_variables)):
mean_grads = tf.reduce_mean(
[final_reward * all_grads[episode_index][step][var_index]
for episode_index, final_rewards in enumerate(all_final_rewards)
for step, final_reward in enumerate(final_rewards)], axis=0)
all_mean_grads.append(mean_grads)
optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))
```
```{python}
# extra code – displays the animation
np.random.seed(42)
show_one_episode(pg_policy)
```
# Extra Material – Markov Chains
The following transition probabilities correspond to the Markov Chain represented in Figure 18–7. Let's run this stochastic process a few times to see what it looks like:
```{python}
np.random.seed(42)
transition_probabilities = [ # shape=[s, s']
[0.7, 0.2, 0.0, 0.1], # from s0 to s0, s1, s2, s3
[0.0, 0.0, 0.9, 0.1], # from s1 to s0, s1, s2, s3
[0.0, 1.0, 0.0, 0.0], # from s2 to s0, s1, s2, s3
[0.0, 0.0, 0.0, 1.0]] # from s3 to s0, s1, s2, s3
n_max_steps = 1000 # to avoid blocking in case of an infinite loop
terminal_states = [3]
def run_chain(start_state):
current_state = start_state
for step in range(n_max_steps):
print(current_state, end=" ")
if current_state in terminal_states:
break
current_state = np.random.choice(
range(len(transition_probabilities)),
p=transition_probabilities[current_state]
)
else:
print("...", end="")
print()
for idx in range(10):
print(f"Run #{idx + 1}: ", end="")
run_chain(start_state=0)
```
# Markov Decision Process
Let's define some transition probabilities, rewards and possible actions. For example, in state s0, if action a0 is chosen then with proba 0.7 we will go to state s0 with reward +10, with probability 0.3 we will go to state s1 with no reward, and with never go to state s2 (so the transition probabilities are `[0.7, 0.3, 0.0]`, and the rewards are `[+10, 0, 0]`):
```{python}
transition_probabilities = [ # shape=[s, a, s']
[[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
[[0.0, 1.0, 0.0], None, [0.0, 0.0, 1.0]],
[None, [0.8, 0.1, 0.1], None]
]
rewards = [ # shape=[s, a, s']
[[+10, 0, 0], [0, 0, 0], [0, 0, 0]],
[[0, 0, 0], [0, 0, 0], [0, 0, -50]],
[[0, 0, 0], [+40, 0, 0], [0, 0, 0]]
]
possible_actions = [[0, 1, 2], [0, 2], [1]]
```
# Q-Value Iteration
```{python}
Q_values = np.full((3, 3), -np.inf) # -np.inf for impossible actions
for state, actions in enumerate(possible_actions):
Q_values[state, actions] = 0.0 # for all possible actions
```
```{python}
gamma = 0.90 # the discount factor
history1 = [] # extra code – needed for the figure below
for iteration in range(50):
Q_prev = Q_values.copy()
history1.append(Q_prev) # extra code
for s in range(3):
for a in possible_actions[s]:
Q_values[s, a] = np.sum([
transition_probabilities[s][a][sp]
* (rewards[s][a][sp] + gamma * Q_prev[sp].max())
for sp in range(3)])
history1 = np.array(history1) # extra code
```
```{python}
Q_values
```
```{python}
Q_values.argmax(axis=1) # optimal action for each state
```
The optimal policy for this MDP, when using a discount factor of 0.90, is to choose action a0 when in state s0, and choose action a0 when in state s1, and finally choose action a1 (the only possible action) when in state s2. If you try again with a discount factor of 0.95 instead of 0.90, you will find that the optimal action for state s1 becomes a2. This is because the discount factor is larger so the agent values the future more, and it is therefore ready to pay an immediate penalty in order to get more future rewards.
# Q-Learning
Q-Learning works by watching an agent play (e.g., randomly) and gradually improving its estimates of the Q-Values. Once it has accurate Q-Value estimates (or close enough), then the optimal policy consists in choosing the action that has the highest Q-Value (i.e., the greedy policy).
We will need to simulate an agent moving around in the environment, so let's define a function to perform some action and get the new state and a reward:
```{python}
def step(state, action):
probas = transition_probabilities[state][action]
next_state = np.random.choice([0, 1, 2], p=probas)
reward = rewards[state][action][next_state]
return next_state, reward
```
We also need an exploration policy, which can be any policy, as long as it visits every possible state many times. We will just use a random policy, since the state space is very small:
```{python}
def exploration_policy(state):
return np.random.choice(possible_actions[state])
```
Now let's initialize the Q-Values like earlier, and run the Q-Learning algorithm:
```{python}
# extra code – initializes the Q-Values, just like earlier
np.random.seed(42)
Q_values = np.full((3, 3), -np.inf)
for state, actions in enumerate(possible_actions):
Q_values[state][actions] = 0
```
```{python}
alpha0 = 0.05 # initial learning rate
decay = 0.005 # learning rate decay
gamma = 0.90 # discount factor
state = 0 # initial state
history2 = [] # extra code – needed for the figure below
for iteration in range(10_000):
history2.append(Q_values.copy()) # extra code
action = exploration_policy(state)
next_state, reward = step(state, action)
next_value = Q_values[next_state].max() # greedy policy at the next step
alpha = alpha0 / (1 + iteration * decay)
Q_values[state, action] *= 1 - alpha
Q_values[state, action] += alpha * (reward + gamma * next_value)
state = next_state
history2 = np.array(history2) # extra code
```
```{python}
# extra code – this cell generates and saves Figure 18–9
true_Q_value = history1[-1, 0, 0]
fig, axes = plt.subplots(1, 2, figsize=(10, 4), sharey=True)
axes[0].set_ylabel("Q-Value$(s_0, a_0)$", fontsize=14)
axes[0].set_title("Q-Value Iteration", fontsize=14)
axes[1].set_title("Q-Learning", fontsize=14)
for ax, width, history in zip(axes, (50, 10000), (history1, history2)):
ax.plot([0, width], [true_Q_value, true_Q_value], "k--")
ax.plot(np.arange(width), history[:, 0, 0], "b-", linewidth=2)
ax.set_xlabel("Iterations", fontsize=14)
ax.axis([0, width, 0, 24])
ax.grid(True)
save_fig("q_value_plot")
plt.show()
```
# Deep Q-Network
Let's build the DQN. Given a state, it will estimate, for each possible action, the sum of discounted future rewards it can expect after it plays that action (but before it sees its outcome):
```{python}
tf.random.set_seed(42) # extra code – ensures reproducibility on the CPU
input_shape = [4] # == env.observation_space.shape
n_outputs = 2 # == env.action_space.n
model = tf.keras.Sequential([
tf.keras.layers.Dense(32, activation="elu", input_shape=input_shape),
tf.keras.layers.Dense(32, activation="elu"),
tf.keras.layers.Dense(n_outputs)
])
```
To select an action using this DQN, we just pick the action with the largest predicted Q-value. However, to ensure that the agent explores the environment, we choose a random action with probability `epsilon`.
```{python}
def epsilon_greedy_policy(state, epsilon=0):
if np.random.rand() < epsilon:
return np.random.randint(n_outputs) # random action
else:
Q_values = model.predict(state[np.newaxis], verbose=0)[0]
return Q_values.argmax() # optimal action according to the DQN
```
We will also need a replay buffer. It will contain the agent's experiences, in the form of tuples: `(obs, action, reward, next_obs, done)`. We can use the `deque` class for that:
```{python}
from collections import deque
replay_buffer = deque(maxlen=2000)
```
**Note**: for very large replay buffers, you may want to use a circular buffer instead, as random access time will be O(1) instead of O(N). Or you can check out DeepMind's [Reverb library](https://github.com/deepmind/reverb).
```{python}
# extra code – A basic circular buffer implementation
class ReplayBuffer:
def __init__(self, max_size):
self.buffer = np.empty(max_size, dtype=object)
self.max_size = max_size
self.index = 0
self.size = 0
def append(self, obj):
self.buffer[self.index] = obj
self.size = min(self.size + 1, self.max_size)
self.index = (self.index + 1) % self.max_size
def sample(self, batch_size):
indices = np.random.randint(self.size, size=batch_size)
return self.buffer[indices]
```
And let's create a function to sample experiences from the replay buffer. It will return 6 NumPy arrays: `[obs, actions, rewards, next_obs, dones, truncateds]`.
```{python}
def sample_experiences(batch_size):
indices = np.random.randint(len(replay_buffer), size=batch_size)
batch = [replay_buffer[index] for index in indices]
return [
np.array([experience[field_index] for experience in batch])
for field_index in range(6)
] # [states, actions, rewards, next_states, dones, truncateds]
```
Now we can create a function that will use the DQN to play one step, and record its experience in the replay buffer:
```{python}
def play_one_step(env, state, epsilon):
action = epsilon_greedy_policy(state, epsilon)
next_state, reward, done, truncated, info = env.step(action)
replay_buffer.append((state, action, reward, next_state, done, truncated))
return next_state, reward, done, truncated, info
```
Lastly, let's create a function that will sample some experiences from the replay buffer and perform a training step:
```{python}
# extra code – for reproducibility, and to generate the next figure
env.reset(seed=42)
np.random.seed(42)
tf.random.set_seed(42)
rewards = []
best_score = 0
```
```{python}
batch_size = 32
discount_factor = 0.95
optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-2)
loss_fn = tf.keras.losses.mean_squared_error
def training_step(batch_size):
experiences = sample_experiences(batch_size)
states, actions, rewards, next_states, dones, truncateds = experiences
next_Q_values = model.predict(next_states, verbose=0)
max_next_Q_values = next_Q_values.max(axis=1)
runs = 1.0 - (dones | truncateds) # episode is not done or truncated
target_Q_values = rewards + runs * discount_factor * max_next_Q_values
target_Q_values = target_Q_values.reshape(-1, 1)
mask = tf.one_hot(actions, n_outputs)
with tf.GradientTape() as tape:
all_Q_values = model(states)
Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
```
And now, let's train the model!
```{python}
#| scrolled: true
for episode in range(600):
obs, info = env.reset()
for step in range(200):
epsilon = max(1 - episode / 500, 0.01)
obs, reward, done, truncated, info = play_one_step(env, obs, epsilon)
if done or truncated:
break
# extra code – displays debug info, stores data for the next figure, and
# keeps track of the best model weights so far
print(f"\rEpisode: {episode + 1}, Steps: {step + 1}, eps: {epsilon:.3f}",
end="")
rewards.append(step)
if step >= best_score:
best_weights = model.get_weights()
best_score = step
if episode > 50:
training_step(batch_size)
model.set_weights(best_weights) # extra code – restores the best model weights
```
```{python}
# extra code – this cell generates and saves Figure 18–10
plt.figure(figsize=(8, 4))
plt.plot(rewards)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Sum of rewards", fontsize=14)
plt.grid(True)
save_fig("dqn_rewards_plot")
plt.show()
```
```{python}
# extra code – shows an animation of the trained DQN playing one episode
show_one_episode(epsilon_greedy_policy)
```
Not bad at all! 😀
## Fixed Q-Value Targets
Let's create the online DQN:
```{python}
# extra code – creates the same DQN model as earlier
tf.random.set_seed(42)
model = tf.keras.Sequential([
tf.keras.layers.Dense(32, activation="elu", input_shape=input_shape),
tf.keras.layers.Dense(32, activation="elu"),
tf.keras.layers.Dense(n_outputs)
])
```
Now create the target DQN: it's just a clone of the online DQN:
```{python}
target = tf.keras.models.clone_model(model) # clone the model's architecture
target.set_weights(model.get_weights()) # copy the weights
```
Next, we use the same code as above except for the line marked with `# <= CHANGED`:
```{python}
env.reset(seed=42)
np.random.seed(42)
tf.random.set_seed(42)
rewards = []
best_score = 0
batch_size = 32
discount_factor = 0.95
optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-2)
loss_fn = tf.keras.losses.mean_squared_error
replay_buffer = deque(maxlen=2000) # resets the replay buffer
def training_step(batch_size):
experiences = sample_experiences(batch_size)
states, actions, rewards, next_states, dones, truncateds = experiences
next_Q_values = target.predict(next_states, verbose=0) # <= CHANGED
max_next_Q_values = next_Q_values.max(axis=1)
runs = 1.0 - (dones | truncateds) # episode is not done or truncated
target_Q_values = rewards + runs * discount_factor * max_next_Q_values
target_Q_values = target_Q_values.reshape(-1, 1)
mask = tf.one_hot(actions, n_outputs)
with tf.GradientTape() as tape:
all_Q_values = model(states)
Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
```
Again, this is the same code as earlier, except for the lines marked with `# <= CHANGED`:
```{python}
for episode in range(600):
obs, info = env.reset()
for step in range(200):
epsilon = max(1 - episode / 500, 0.01)
obs, reward, done, info, truncated = play_one_step(env, obs, epsilon)
if done or truncated:
break
# extra code – displays debug info, stores data for the next figure, and
# keeps track of the best model weights so far
print(f"\rEpisode: {episode + 1}, Steps: {step + 1}, eps: {epsilon:.3f}",
end="")
rewards.append(step)
if step >= best_score:
best_weights = model.get_weights()
best_score = step
if episode > 50:
training_step(batch_size)
if episode % 50 == 0: # <= CHANGED
target.set_weights(model.get_weights()) # <= CHANGED
# Alternatively, you can do soft updates at each step:
#if episode > 50:
#training_step(batch_size)
#target_weights = target.get_weights()
#online_weights = model.get_weights()
#for index, online_weight in enumerate(online_weights):
# target_weights[index] = (0.99 * target_weights[index]
# + 0.01 * online_weight)
#target.set_weights(target_weights)
model.set_weights(best_weights) # extra code – restores the best model weights
```
```{python}
# extra code – this cell plots the learning curve
plt.figure(figsize=(8, 4))
plt.plot(rewards)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Sum of rewards", fontsize=14)
plt.grid(True)
plt.show()
```
```{python}
#| scrolled: true
#| tags: []
# extra code – shows an animation of the trained DQN playing one episode
show_one_episode(epsilon_greedy_policy)
```
## Double DQN
The code is exactly the same as for fixed Q-Value targets, except for the section marked as changed in the `training_step()` function:
```{python}
tf.random.set_seed(42)
model = tf.keras.Sequential([
tf.keras.layers.Dense(32, activation="elu", input_shape=input_shape),
tf.keras.layers.Dense(32, activation="elu"),
tf.keras.layers.Dense(n_outputs)
])
target = tf.keras.models.clone_model(model) # clone the model's architecture
target.set_weights(model.get_weights()) # copy the weights
env.reset(seed=42)
np.random.seed(42)
tf.random.set_seed(42)
rewards = []
best_score = 0
batch_size = 32
discount_factor = 0.95
optimizer = tf.keras.optimizers.Nadam(learning_rate=1e-2)
loss_fn = tf.keras.losses.mean_squared_error
def training_step(batch_size):
experiences = sample_experiences(batch_size)
states, actions, rewards, next_states, dones, truncateds = experiences
#################### CHANGED SECTION ####################
next_Q_values = model.predict(next_states, verbose=0) # ≠ target.predict()
best_next_actions = next_Q_values.argmax(axis=1)
next_mask = tf.one_hot(best_next_actions, n_outputs).numpy()
max_next_Q_values = (target.predict(next_states, verbose=0) * next_mask
).sum(axis=1)
#########################################################
runs = 1.0 - (dones | truncateds) # episode is not done or truncated
target_Q_values = rewards + runs * discount_factor * max_next_Q_values
target_Q_values = target_Q_values.reshape(-1, 1)
mask = tf.one_hot(actions, n_outputs)
with tf.GradientTape() as tape:
all_Q_values = model(states)
Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
replay_buffer = deque(maxlen=2000)
for episode in range(600):
obs, info = env.reset()
for step in range(200):
epsilon = max(1 - episode / 500, 0.01)
obs, reward, done, info, truncated = play_one_step(env, obs, epsilon)
if done or truncated:
break
print(f"\rEpisode: {episode + 1}, Steps: {step + 1}, eps: {epsilon:.3f}",
end="")
rewards.append(step)
if step >= best_score:
best_weights = model.get_weights()
best_score = step
if episode > 50:
training_step(batch_size)
if episode % 50 == 0:
target.set_weights(model.get_weights())
model.set_weights(best_weights)
```
```{python}
# extra code – this cell plots the learning curve
plt.figure(figsize=(8, 4))
plt.plot(rewards)
plt.xlabel("Episode", fontsize=14)
plt.ylabel("Sum of rewards", fontsize=14)
plt.grid(True)
plt.show()
```
```{python}
#| scrolled: true
# extra code – shows an animation of the trained DQN playing one episode
show_one_episode(epsilon_greedy_policy)
```
# Dueling Double DQN
```{python}
tf.random.set_seed(42) # extra code – ensures reproducibility on the CPU
input_states = tf.keras.layers.Input(shape=[4])
hidden1 = tf.keras.layers.Dense(32, activation="elu")(input_states)
hidden2 = tf.keras.layers.Dense(32, activation="elu")(hidden1)
state_values = tf.keras.layers.Dense(1)(hidden2)
raw_advantages = tf.keras.layers.Dense(n_outputs)(hidden2)
advantages = raw_advantages - tf.reduce_max(raw_advantages, axis=1,
keepdims=True)
Q_values = state_values + advantages
model = tf.keras.Model(inputs=[input_states], outputs=[Q_values])
```
The rest is the same code as earlier: