-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalpha1_aerob_threshold estimation.py
163 lines (137 loc) · 5.96 KB
/
alpha1_aerob_threshold estimation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import math
from sklearn.linear_model import LinearRegression
def DFA(pp_values, lower_scale_limit, upper_scale_limit):
scaleDensity = 30 # scales DFA is conducted between lower_scale_limit and upper_scale_limit
m = 1 # order of polynomial fit (linear = 1, quadratic m = 2, cubic m = 3, etc...)
# initialize, we use logarithmic scales
start = np.log(lower_scale_limit) / np.log(10)
stop = np.log(upper_scale_limit) / np.log(10)
scales = np.floor(np.logspace(np.log10(math.pow(10, start)), np.log10(math.pow(10, stop)), scaleDensity))
F = np.zeros(len(scales))
count = 0
for s in scales:
rms = []
# Step 1: Determine the "profile" (integrated signal with subtracted offset)
x = pp_values
y_n = np.cumsum(x - np.mean(x))
# Step 2: Divide the profile into N non-overlapping segments of equal length s
L = len(x)
shape = [int(s), int(np.floor(L/s))]
nwSize = int(shape[0]) * int(shape[1])
# beginning to end, here we reshape so that we have a number of segments based on the scale used at this cycle
Y_n1 = np.reshape(y_n[0:nwSize], shape, order="F")
Y_n1 = Y_n1.T
# end to beginning
Y_n2 = np.reshape(y_n[len(y_n) - (nwSize):len(y_n)], shape, order="F")
Y_n2 = Y_n2.T
# concatenate
Y_n = np.vstack((Y_n1, Y_n2))
# Step 3: Calculate the local trend for each 2Ns segments by a least squares fit of the series
for cut in np.arange(0, 2 * shape[1]):
xcut = np.arange(0, shape[0])
pl = np.polyfit(xcut, Y_n[cut,:], m)
Yfit = np.polyval(pl, xcut)
arr = Yfit - Y_n[cut,:]
rms.append(np.sqrt(np.mean(arr * arr)))
if (len(rms) > 0):
F[count] = np.power((1 / (shape[1] * 2)) * np.sum(np.power(rms, 2)), 1/2)
count = count + 1
pl2 = np.polyfit(np.log2(scales), np.log2(F), 1)
alpha = pl2[0]
return alpha
def computeFeatures(df):
features = []
step = 120
for index in range(0, int(round(np.max(x)/step))):
array_rr = df.loc[(df['timestamp'] >= (index*step)) & (df['timestamp'] <= (index+1)*step), 'RR']*1000
# compute heart rate
heartrate = round(60000/np.mean(array_rr), 2)
# compute rmssd
NNdiff = np.abs(np.diff(array_rr))
rmssd = round(np.sqrt(np.sum((NNdiff * NNdiff) / len(NNdiff))), 2)
# compute sdnn
sdnn = round(np.std(array_rr), 2)
#dfa, alpha 1
alpha1 = DFA(array_rr.to_list(), 4, 16)
curr_features = {
'timestamp': index,
'heartrate': heartrate,
'rmssd': rmssd,
'sdnn': sdnn,
'alpha1': alpha1,
}
features.append(curr_features)
features_df = pd.DataFrame(features)
return features_df
#-------------------------------------------------------------------------------------------------------------------------
# Read the data
df = pd.read_csv('ACR_HRV_Data_3.csv')# Data from 2021/01/01 till present
# type of activities to be used for analisys (running, cycling, all.)
sport = 'cycling'
if sport=='cycling':
df = df.loc[df['sport'] == 'cycling']
plot_title = 'Aerobic threshold heartrate derived from DFA alpha1 (cycling)'
elif sport=='running':
df = df.loc[df['sport'] == 'running']
plot_title = 'Aerobic threshold heartrate derived from DFA alpha1 (running)'
else :
df = df
plot_title = 'Aerobic threshold heartrate derived from DFA alpha1'
# create an array of all unique gc_activity_id values
gc_activity_id_list = df['gc_activity_id'].unique()
results = []
# Iterate through the list of gc_activity_id values.
for gc_activity_id in gc_activity_id_list:
print(gc_activity_id)
# for each gc_activity_id in the list create a list of corresponding values in the hrv column
RRs = df.loc[df['gc_activity_id'] == gc_activity_id, 'hrv'].tolist()
# convert the list of strings to a list of floats
RRs = [float(i) for i in RRs]
# remove values > 1 std from the mean
mean = np.mean(RRs)
std = np.std(RRs)
print(std)
RRs = [x for x in RRs if (x > mean - std)]
RRs = [x for x in RRs if (x < mean + std)]
# Correct artifacts - Create a 'filteredRRs' list of values that are within 5% beat to beat difference and ignore the rest
artifact_correction_threshold = 0.05
filtered_RRs = []
for i in range(len(RRs)):
if RRs[(i-1)]*(1-artifact_correction_threshold) < RRs[i] < RRs[(i-1)]*(1+artifact_correction_threshold):
filtered_RRs.append(RRs[i])
# Compute the cumulative sum of the filtered RRs to get the timestamps
x = np.cumsum(filtered_RRs)
# Create a dataframe with the timestamps and the filtered RRs
df_1 = pd.DataFrame()
df_1['timestamp'] = x
df_1['RR'] = filtered_RRs
try:
# Compute features
features_df = computeFeatures(df_1)
print(features_df.head())
length = len(features_df['alpha1'])
# define linear regression model and fit values
reg = LinearRegression().fit(features_df['alpha1'].values.reshape(length, 1), features_df['heartrate'].values.reshape(length, 1))
# predict heartrate value at DFA alpha1 0.75
prediction = reg.predict(np.array(0.75).reshape(1, 1))
print(math.floor(prediction))
# append result to list
results.append(math.floor(prediction))
except:
continue
# Plot the results
plt.scatter(range(len(results)), results, c=results, cmap='viridis', alpha=0.5)
plt.colorbar()
# compute and plot a curved line of best fit
z = np.polyfit(range(len(results)), results, 3)
p = np.poly1d(z)
plt.plot(range(len(results)), p(range(len(results))), "r--")
#labels
plt.xlabel('Activity')
plt.ylabel('Predicted aerobic threshold heartrate')
#title
plt.title(plot_title)
plt.show()