-
Notifications
You must be signed in to change notification settings - Fork 722
/
Copy pathStockTradingEnv0.py
153 lines (123 loc) · 5.42 KB
/
StockTradingEnv0.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import random
import json
import gym
from gym import spaces
import pandas as pd
import numpy as np
MAX_ACCOUNT_BALANCE = 2147483647
MAX_NUM_SHARES = 2147483647
MAX_SHARE_PRICE = 5000
MAX_VOLUME = 1000e8
MAX_AMOUNT = 3e10
MAX_OPEN_POSITIONS = 5
MAX_STEPS = 20000
MAX_DAY_CHANGE = 1
INITIAL_ACCOUNT_BALANCE = 10000
class StockTradingEnv(gym.Env):
"""A stock trading environment for OpenAI gym"""
metadata = {'render.modes': ['human']}
def __init__(self, df):
super(StockTradingEnv, self).__init__()
self.df = df
self.reward_range = (0, MAX_ACCOUNT_BALANCE)
# Actions of the format Buy x%, Sell x%, Hold, etc.
self.action_space = spaces.Box(
low=np.array([0, 0]), high=np.array([3, 1]), dtype=np.float16)
# Prices contains the OHCL values for the last five prices
self.observation_space = spaces.Box(
low=0, high=1, shape=(19,), dtype=np.float16)
def _next_observation(self):
obs = np.array([
self.df.loc[self.current_step, 'open'] / MAX_SHARE_PRICE,
self.df.loc[self.current_step, 'high'] / MAX_SHARE_PRICE,
self.df.loc[self.current_step, 'low'] / MAX_SHARE_PRICE,
self.df.loc[self.current_step, 'close'] / MAX_SHARE_PRICE,
self.df.loc[self.current_step, 'volume'] / MAX_VOLUME,
self.df.loc[self.current_step, 'amount'] / MAX_AMOUNT,
self.df.loc[self.current_step, 'adjustflag'] / 10,
self.df.loc[self.current_step, 'tradestatus'] / 1,
self.df.loc[self.current_step, 'pctChg'] / 100,
self.df.loc[self.current_step, 'peTTM'] / 1e4,
self.df.loc[self.current_step, 'pbMRQ'] / 100,
self.df.loc[self.current_step, 'psTTM'] / 100,
self.df.loc[self.current_step, 'pctChg'] / 1e3,
self.balance / MAX_ACCOUNT_BALANCE,
self.max_net_worth / MAX_ACCOUNT_BALANCE,
self.shares_held / MAX_NUM_SHARES,
self.cost_basis / MAX_SHARE_PRICE,
self.total_shares_sold / MAX_NUM_SHARES,
self.total_sales_value / (MAX_NUM_SHARES * MAX_SHARE_PRICE),
])
return obs
def _take_action(self, action):
# Set the current price to a random price within the time step
current_price = random.uniform(
self.df.loc[self.current_step, "open"], self.df.loc[self.current_step, "close"])
action_type = action[0]
amount = action[1]
if action_type < 1:
# Buy amount % of balance in shares
total_possible = int(self.balance / current_price)
shares_bought = int(total_possible * amount)
prev_cost = self.cost_basis * self.shares_held
additional_cost = shares_bought * current_price
self.balance -= additional_cost
self.cost_basis = (
prev_cost + additional_cost) / (self.shares_held + shares_bought)
self.shares_held += shares_bought
elif action_type < 2:
# Sell amount % of shares held
shares_sold = int(self.shares_held * amount)
self.balance += shares_sold * current_price
self.shares_held -= shares_sold
self.total_shares_sold += shares_sold
self.total_sales_value += shares_sold * current_price
self.net_worth = self.balance + self.shares_held * current_price
if self.net_worth > self.max_net_worth:
self.max_net_worth = self.net_worth
if self.shares_held == 0:
self.cost_basis = 0
def step(self, action):
# Execute one time step within the environment
self._take_action(action)
done = False
self.current_step += 1
if self.current_step > len(self.df.loc[:, 'open'].values) - 1:
self.current_step = 0 # loop training
# done = True
delay_modifier = (self.current_step / MAX_STEPS)
# profits
reward = self.net_worth - INITIAL_ACCOUNT_BALANCE
reward = 1 if reward > 0 else -100
if self.net_worth <= 0:
done = True
obs = self._next_observation()
return obs, reward, done, {}
def reset(self, new_df=None):
# Reset the state of the environment to an initial state
self.balance = INITIAL_ACCOUNT_BALANCE
self.net_worth = INITIAL_ACCOUNT_BALANCE
self.max_net_worth = INITIAL_ACCOUNT_BALANCE
self.shares_held = 0
self.cost_basis = 0
self.total_shares_sold = 0
self.total_sales_value = 0
# pass test dataset to environment
if new_df:
self.df = new_df
# Set the current step to a random point within the data frame
# self.current_step = random.randint(
# 0, len(self.df.loc[:, 'open'].values) - 6)
self.current_step = 0
return self._next_observation()
def render(self, mode='human', close=False):
# Render the environment to the screen
profit = self.net_worth - INITIAL_ACCOUNT_BALANCE
print('-'*30)
print(f'Step: {self.current_step}')
print(f'Balance: {self.balance}')
print(f'Shares held: {self.shares_held} (Total sold: {self.total_shares_sold})')
print(f'Avg cost for held shares: {self.cost_basis} (Total sales value: {self.total_sales_value})')
print(f'Net worth: {self.net_worth} (Max net worth: {self.max_net_worth})')
print(f'Profit: {profit}')
return profit