-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcore.py
105 lines (85 loc) · 3.66 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/python3
"""
Author: Gabriel Kulp
Created: 1/19/2017
This module holds global variables and the functions
most fundamental to this program.
"""
# Global setting for amplitude granularity
STEPS_IN_AMPLITUDE = 10
import time
from collections import namedtuple
# Make data structure used for each interval in the timeline
Interval = namedtuple("Interval", "start duration amplitude")
# runs through one cycle of the timeline
def do_cycle(device, timeline, steps, total_time, multiplier):
"""
Runs through one cycle of the gait.
:param device: is the output device
:param timeline: is the 2D array of intervals to read.
:param total_time: is the time it should take to complete the cycle.
:param multiplier: is the maximum amplitude outputted.
"""
# set the index on each channel to 0
curr_index = [0] * len(timeline)
# run through timeline
for curr_step in range(0, steps):
step_start_time = time.time()
# start row with time stamp
print(curr_step, "\t", sep='', end='')
amplitudes = []
# find value of each channel
for chan in range(0, len(timeline)):
# return value and update channel's current index
amp, curr_index[chan] = read_channel(timeline, chan, curr_index[chan], curr_step)
amplitudes.append(amp)
# set all values at once
write_out(device, amplitudes, multiplier)
sleep_time = (total_time/steps) - (time.time() - step_start_time)
if sleep_time > 0:
time.sleep(sleep_time)
# return value on the selected channel given the time
def read_channel(timeline, channel_id, curr_index, curr_time):
"""
Returns the current amplitude specified in the timeline.
:param timeline: is the 2D array of intervals to read.
:param channel_id: is the ID number of the pneumatic valve channel.
:param curr_index: is the index in the timeline to start reading from.
:param curr_time: is the time on the timeline to evaluate.
"""
# while not past last interval
while curr_index < len(timeline[channel_id]):
# get interval to check
curr_interval = timeline[channel_id][curr_index]
# if we're past the starting point
if curr_time >= curr_interval.start:
# if currently in the interval's duration
if curr_time - curr_interval.start < curr_interval.duration:
# return the interval's amplitude and index (since it might have incremented)
return (curr_interval.amplitude, curr_index)
else: # not in duration
# finished one interval; check next interval
curr_index = curr_index + 1
# start at the top of the while loop again,
# but fetching a different interval to test
continue
else: # not past the starting point
# between intervals
return 0, curr_index
# after last interval in timeline
return 0, curr_index
def write_out(device, values, multiplier):
"""
Prints stuff to the console and writes to device pins if available.
:param device: is the device to output to.
:param values: the stuff to print.
:param multiplier: scales the output values, but only to the device. Unscaled
numbers are printed to the screen.
"""
print(values)
scaled_values = []
# re-scales data from internal representation to external input representation.
for value in values:
value = float(value) * float(multiplier) / float(STEPS_IN_AMPLITUDE)
scaled_values.append(value)
device.send(scaled_values)