forked from tradytics/surpriver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
detection_engine.py
284 lines (238 loc) · 13.7 KB
/
detection_engine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# Basic libraries
import os
import ta
import sys
import json
import math
import pickle
import random
import requests
import collections
import numpy as np
from os import walk
import pandas as pd
import yfinance as yf
import datetime as dt
from scipy.stats import linregress
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from sklearn.ensemble import IsolationForest
from data_loader import DataEngine
import warnings
warnings.filterwarnings("ignore")
# Styling for plots
plt.style.use('seaborn-white')
plt.rc('grid', linestyle="dotted", color='#a0a0a0')
plt.rcParams['axes.edgecolor'] = "#04383F"
# Argument parsing
import argparse
argParser = argparse.ArgumentParser()
argParser.add_argument("--top_n", type=int, default = 25, help="How many top predictions do you want to print")
argParser.add_argument("--min_volume", type=int, default = 5000, help="Minimum volume filter. Stocks with average volume of less than this value will be ignored")
argParser.add_argument("--history_to_use", type=int, default = 7, help="How many bars of 1 hour do you want to use for the anomaly detection model.")
argParser.add_argument("--is_load_from_dictionary", type=int, default = 0, help="Whether to load data from dictionary or get it from yahoo finance.")
argParser.add_argument("--data_dictionary_path", type=str, default = "dictionaries/data_dictionary.npy", help="Data dictionary path.")
argParser.add_argument("--is_save_dictionary", type=int, default = 1, help="Whether to save data in a dictionary.")
argParser.add_argument("--data_granularity_minutes", type=int, default = 15, help="Minute level data granularity that you want to use. Default is 60 minute bars.")
argParser.add_argument("--is_test", type=int, default = 0, help="Whether to test the tool or just predict for future. When testing, you should set the future_bars to larger than 1.")
argParser.add_argument("--future_bars", type=int, default = 25, help="How many bars to keep for testing purposes.")
argParser.add_argument("--volatility_filter", type=float, default = 0.05, help="Stocks with volatility less than this value will be ignored.")
args = argParser.parse_args()
top_n = args.top_n
min_volume = args.min_volume
history_to_use = args.history_to_use
is_load_from_dictionary = args.is_load_from_dictionary
data_dictionary_path = args.data_dictionary_path
is_save_dictionary = args.is_save_dictionary
data_granularity_minutes = args.data_granularity_minutes
is_test = args.is_test
future_bars = args.future_bars
volatility_filter = args.volatility_filter
"""
Sample run:
python detection_engine.py --is_test 1 --future_bars 25 --top_n 25 --min_volume 5000 --data_granularity_minutes 60 --history_to_use 14 --is_load_from_dictionary 0 --data_dictionary_path 'dictionaries/feature_dict.npy' --is_save_dictionary 1
"""
class ArgChecker:
def __init__(self):
print("Checking arguments...")
self.check_arugments()
def check_arugments(self):
granularity_constraints_list = [1, 5, 10, 15, 30, 60]
granularity_constraints_list_string = ''.join(str(value) + "," for value in granularity_constraints_list).strip(",")
if data_granularity_minutes not in granularity_constraints_list:
print("You can only choose the following values for 'data_granularity_minutes' argument -> %s\nExiting now..." % granularity_constraints_list_string)
exit()
if is_test == 1 and future_bars < 2:
print("You want to test but the future bars are less than 2. That does not give us enough data to test the model properly. Please use a value larger than 2.\nExiting now...")
exit()
class Surpriver:
def __init__(self):
print("Surpriver has been initialized...")
self.TOP_PREDICTIONS_TO_PRINT = top_n
self.HISTORY_TO_USE = history_to_use
self.MINIMUM_VOLUME = min_volume
self.IS_LOAD_FROM_DICTIONARY = is_load_from_dictionary
self.DATA_DICTIONARY_PATH = data_dictionary_path
self.IS_SAVE_DICTIONARY = is_save_dictionary
self.DATA_GRANULARITY_MINUTES = data_granularity_minutes
self.IS_TEST = is_test
self.FUTURE_BARS_FOR_TESTING = future_bars
self.VOLATILITY_FILTER = volatility_filter
# Create data engine
self.dataEngine = DataEngine(self.HISTORY_TO_USE, self.DATA_GRANULARITY_MINUTES,
self.IS_SAVE_DICTIONARY, self.IS_LOAD_FROM_DICTIONARY, self.DATA_DICTIONARY_PATH,
self.MINIMUM_VOLUME,
self.IS_TEST, self.FUTURE_BARS_FOR_TESTING,
self.VOLATILITY_FILTER)
def is_nan(self, object):
"""
Checks if a value is null.
"""
return object != object
def calculate_percentage_change(self, old, new):
return ((new - old) * 100) / old
def calculate_return(self, old, new):
return new / old
def parse_large_values(self, value):
if value < 1000:
value = str(value)
elif value >= 1000 and value < 1000000:
value = round(value / 1000, 2)
value = str(value) + "K"
else:
value = round(value / 1000000, 1)
value = str(value) + "M"
return value
def calculate_volume_changes(self, historical_price):
volume = list(historical_price["Volume"])
dates = list(historical_price["Datetime"])
dates = [str(date) for date in dates]
# Get volume by date
volume_by_date_dictionary = collections.defaultdict(list)
for j in range(0, len(volume)):
date = dates[j].split(" ")[0]
volume_by_date_dictionary[date].append(volume[j])
for key in volume_by_date_dictionary:
volume_by_date_dictionary[key] = np.sum(volume_by_date_dictionary[key]) # taking average as we have multiple bars per day.
# Get all dates
all_dates = list(reversed(sorted(volume_by_date_dictionary.keys())))
latest_date = all_dates[0]
latest_data_point = list(reversed(sorted(dates)))[0]
# Get volume information
today_volume = volume_by_date_dictionary[latest_date]
average_vol_last_five_days = np.mean([volume_by_date_dictionary[date] for date in all_dates[1:6]])
average_vol_last_twenty_days = np.mean([volume_by_date_dictionary[date] for date in all_dates[1:20]])
return latest_data_point, self.parse_large_values(today_volume), self.parse_large_values(average_vol_last_five_days), self.parse_large_values(average_vol_last_twenty_days)
def calculate_recent_volatility(self, historical_price):
close_price = list(historical_price["Close"])
volatility_five_bars = np.std(close_price[-5:])
volatility_twenty_bars = np.std(close_price[-20:])
volatility_all = np.std(close_price)
return volatility_five_bars, volatility_twenty_bars, volatility_all
def calculate_future_performance(self, future_data):
CLOSE_PRICE_INDEX = 4
price_at_alert = future_data[0][CLOSE_PRICE_INDEX]
prices_in_future = [item[CLOSE_PRICE_INDEX] for item in future_data[1:]]
prices_in_future = [item for item in prices_in_future if item != 0]
total_sum_percentage_change = abs(sum([self.calculate_percentage_change(price_at_alert, next_price) for next_price in prices_in_future]))
future_volatility = np.std(prices_in_future)
return total_sum_percentage_change, future_volatility
def find_anomalies(self):
"""
Main function that does everything
"""
# Gather data for all stocks
if self.IS_LOAD_FROM_DICTIONARY == 0:
features, historical_price_info, future_prices, symbol_names = self.dataEngine.collect_data_for_all_tickers()
else:
# Load data from dictionary
features, historical_price_info, future_prices, symbol_names = self.dataEngine.load_data_from_dictionary()
# Find anomalous stocks using the Isolation Forest model. Read more about the model at -> https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html
detector = IsolationForest(n_estimators = 100, random_state = 0)
detector.fit(features)
predictions = detector.decision_function(features)
# Print top predictions with some statistics
predictions_with_output_data = [[predictions[i], symbol_names[i], historical_price_info[i], future_prices[i]] for i in range(0, len(predictions))]
predictions_with_output_data = list(sorted(predictions_with_output_data))
for item in predictions_with_output_data[:self.TOP_PREDICTIONS_TO_PRINT]:
# Get some stats to print
prediction, symbol, historical_price, future_price = item
# Check if future data is present or not
if self.IS_TEST == 1 and len(future_price) < 5:
print("No future data is present. Please make sure that you ran the prior command with is_test enabled or disable that command now. Exiting now...")
exit()
latest_date, today_volume, average_vol_last_five_days, average_vol_last_twenty_days = self.calculate_volume_changes(historical_price)
volatility_vol_last_five_days, volatility_vol_last_twenty_days, _ = self.calculate_recent_volatility(historical_price)
if average_vol_last_five_days == None or volatility_vol_last_five_days == None:
continue
if self.IS_TEST == 0:
# Not testing so just print the predictions
print("Last Bar Time: %s\nSymbol: %s\nAnomaly Score: %.3f\nToday Volume: %s\nAverage Volume 5d: %s\nAverage Volume 20d: %s\nVolatility 5bars: %.3f\nVolatility 20bars: %.3f\n----------------------" %
(latest_date, symbol, prediction,
today_volume, average_vol_last_five_days, average_vol_last_twenty_days,
volatility_vol_last_five_days, volatility_vol_last_twenty_days))
else:
# Testing so show what happened in the future
future_abs_sum_percentage_change, _ = self.calculate_future_performance(future_price)
print("Last Bar Time: %s\nSymbol: %s\nAnomaly Score: %.3f\nToday Volume: %s\nAverage Volume 5d: %s\nAverage Volume 20d: %s\nVolatility 5bars: %.3f\nVolatility 20bars: %.3f\nFuture Absolute Sum Price Changes: %.2f\n----------------------" %
(latest_date, symbol, prediction,
today_volume, average_vol_last_five_days, average_vol_last_twenty_days,
volatility_vol_last_five_days, volatility_vol_last_twenty_days,
future_abs_sum_percentage_change))
if self.IS_TEST == 1:
self.calculate_future_stats(predictions_with_output_data)
def calculate_future_stats(self, predictions_with_output_data):
"""
Calculate different stats for future data to show whether the anomalous stocks found were actually better than non-anomalous ones
"""
future_change = []
anomalous_score = []
historical_volatilities = []
future_volatilities = []
for item in predictions_with_output_data:
prediction, symbol, historical_price, future_price = item
future_sum_percentage_change, future_volatility = self.calculate_future_performance(future_price)
_, _, historical_volatility = self.calculate_recent_volatility(historical_price)
# Skip for when there is a reverse split, the yfinance package does not handle that well so percentages get weirdly large
if abs(future_sum_percentage_change) > 250 or self.is_nan(future_sum_percentage_change) == True or self.is_nan(prediction) == True:
continue
future_change.append(future_sum_percentage_change)
anomalous_score.append(prediction)
future_volatilities.append(future_volatility)
historical_volatilities.append(historical_volatility)
# Calculate correlation and stats
correlation = np.corrcoef(anomalous_score, future_change)[0, 1]
anomalous_future_changes = np.mean([future_change[x] for x in range(0, len(future_change)) if anomalous_score[x] < 0]) # Anything less than 0 is considered anomalous
normal_future_changes = np.mean([future_change[x] for x in range(0, len(future_change)) if anomalous_score[x] >= 0])
anomalous_future_volatilities = np.mean([future_volatilities[x] for x in range(0, len(future_volatilities)) if anomalous_score[x] < 0]) # Anything less than 0 is considered anomalous
normal_future_volatilities = np.mean([future_volatilities[x] for x in range(0, len(future_volatilities)) if anomalous_score[x] >= 0])
anomalous_historical_volatilities = np.mean([historical_volatilities[x] for x in range(0, len(historical_volatilities)) if anomalous_score[x] < 0]) # Anything less than 0 is considered anomalous
normal_historical_volatilities = np.mean([historical_volatilities[x] for x in range(0, len(historical_volatilities)) if anomalous_score[x] >= 0])
print("\n*************** Future Performance ***************")
print("Correlation between future absolute change vs anomalous score (lower is better, range = (-1, 1)): **%.2f**\nTotal absolute change in future for Anomalous Stocks: **%.3f**\nTotal absolute change in future for Normal Stocks: **%.3f**\nAverage future volatility of Anomalous Stocks: **%.3f**\nAverage future volatility of Normal Stocks: **%.3f**\nHistorical volatility for Anomalous Stocks: **%.3f**\nHistorical volatility for Normal Stocks: **%.3f**\n" % (
correlation,
anomalous_future_changes, normal_future_changes,
anomalous_future_volatilities, normal_future_volatilities,
anomalous_historical_volatilities, normal_historical_volatilities))
# Plot
FONT_SIZE = 14
colors = ['#c91414' if anomalous_score[x] < 0 else '#035AA6' for x in range(0, len(anomalous_score))]
anomalous_vs_normal = np.array([1 if anomalous_score[x] < 0 else 0 for x in range(0, len(anomalous_score))])
plt.scatter(np.array(anomalous_score)[anomalous_vs_normal == 1], np.array(future_change)[anomalous_vs_normal == 1], marker='v', color = '#c91414')
plt.scatter(np.array(anomalous_score)[anomalous_vs_normal == 0], np.array(future_change)[anomalous_vs_normal == 0], marker='P', color = '#035AA6')
plt.axvline(x = 0, linestyle = '--', color = '#848484')
plt.xlabel("Anomaly Score", fontsize = FONT_SIZE)
plt.ylabel("Absolute Future Change", fontsize = FONT_SIZE)
plt.xticks(fontsize = FONT_SIZE)
plt.yticks(fontsize = FONT_SIZE)
plt.legend(["Anomalous", "Normal"], fontsize = FONT_SIZE)
plt.title("Absolute Future Change", fontsize = FONT_SIZE)
plt.tight_layout()
plt.grid()
plt.show()
# Check arguments
argumentChecker = ArgChecker()
# Create surpriver instance
supriver = Surpriver()
# Generate predictions
supriver.find_anomalies()