-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparse_tcx.py
128 lines (104 loc) · 3.73 KB
/
parse_tcx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# -*- coding: utf-8 -*-
'''
Partial parser for Garmin Connect tcx files. Returns a Pandas DataFrame.
'''
import os
import sys
import lxml.etree
import numpy as np
import pandas as pd
def get_activity_type(root, garmin_ns):
node = root.find(garmin_ns + 'Activity')
return node.attrib['Sport']
def calculate_speed(distance, time):
dt = time.diff().astype('timedelta64[s]')
speed = 3.6*distance.diff()/dt
return speed
def calculate_slope(distance, altitude):
dx = distance.diff()
dy = altitude.diff()
slope = 180/np.pi * np.arctan2(dy, dx)
return slope
def parse_cycling(root, garmin_ns):
df = pd.DataFrame()
for tp in root.findall(garmin_ns + 'Trackpoint'):
row_data = {}
for c in tp.getchildren():
name = c.tag.split('}')[1]
if 'Time' in name:
row_data[name] = pd.to_datetime(c.text)
elif 'Position' in name:
continue
elif 'HeartRateBpm' in name:
row_data[name] = float(c.getchildren()[0].text)
else:
try:
row_data[name] = float(c.text)
except:
pass
row = pd.Series(row_data)
df = df.append(row, ignore_index=True)
df['Cadence'] = np.nan
return df
def parse_running(root, garmin_ns):
df = pd.DataFrame()
for tp in root.findall(garmin_ns + 'Trackpoint'):
row_data = {}
for c in tp.getchildren():
name = c.tag.split('}')[1]
if 'Time' in name:
row_data[name] = pd.to_datetime(c.text)
elif 'Position' in name:
tpx = c.getchildren()[0]
row_data['Latitude'] = float(c.getchildren()[0].text)
row_data['Longitude'] = float(c.getchildren()[1].text)
elif 'HeartRateBpm' in name:
row_data[name] = float(c.getchildren()[0].text)
elif 'Extensions' in name:
tpx = c.getchildren()[0]
for c2 in tpx:
if 'Cadence' in c2.tag:
row_data['Cadence'] = 2*float(c2.text)
else:
try:
row_data[name] = float(c.text)
except:
pass
row = pd.Series(row_data)
df = df.append(row, ignore_index=True)
return df
def load_tcx_data(filename):
root = lxml.etree.parse(filename).getroot()
garmin_ns = './/{{{0}}}'.format(root.nsmap[None])
activity_type = get_activity_type(root, garmin_ns)
print('Activity type: ', activity_type)
if activity_type == 'Running':
df = parse_running(root, garmin_ns)
elif activity_type == 'Biking':
df = parse_cycling(root, garmin_ns)
else:
raise RuntimeError('Activity not implemented yet: {0}'.format(activity_type))
seconds_since = (df['Time'] - df['Time'][0]).astype('timedelta64[s]')
df['SecondsElapsed'] = seconds_since
speed = calculate_speed(df['DistanceMeters'], df['Time'])
df['Speed'] = speed
slope = calculate_slope(df['DistanceMeters'], df['AltitudeMeters'])
df['Slope'] = slope
return df
def get_activity_data(filename):
# Load TCX file activity data.
# Then store it as a HDF5 file.
# If HDF5 file already exists, load data from there.
basename = filename[:-4]
filename_df = basename + '.h5'
if os.path.exists(filename_df):
print('Loading data from HDF5 file')
df = pd.read_hdf(filename_df, 'ActivityData')
else:
df = load_tcx_data(filename)
print('Storing data to HDF5 file: ', filename_df)
df.to_hdf(filename_df, 'ActivityData')
return df
if __name__ == '__main__':
df = load_tcx_data(sys.argv[1])
print(df.describe())