-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathextract_negedge.py
111 lines (101 loc) · 3.66 KB
/
extract_negedge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import csv,sys
import queue
from statistics import mean, median, variance, stdev
from scipy.stats import skew, kurtosis
from functools import reduce
from math import sqrt,floor,ceil
import numpy as np
def rms(xs):
return sqrt(reduce(lambda a, x: a + x * x, xs, 0) / len(xs))
def en(xs):
return reduce(lambda a, x: a + x * x, xs, 0) / len(xs)
idx = 0
negedge_term = 0
threshold = 2.97
can_dom_bit_idx = 0 # 0 to 2000
can_res_bit_idx = 0 # 0 to 2000
can_signal = []
SOF = False
NEGEDGE = False
negedge_q = queue.Queue()
negedge_list = []
prev_can_signal_len = -1
sampling_rate = int(sys.argv[2])
#print(sampling_rate, 'MS/s')
#print(floor(float(1/sampling_rate)*1000), 'ns resolution')
skip_duration = floor(float(1/sampling_rate)*1000/2)
#print(skip_duration, 'skip duration')
queue_length = ceil(sampling_rate/2.0)+1
#print(queue_length, 'queue length')
buffering_term = floor((sampling_rate/2.0+1)/2)
#print(buffering_term, 'buffering term')
with open(sys.argv[1]) as f:
while True:
idx += 1
row = f.readline()
if idx == 1 or idx == 2 or idx == 3:
continue
#print(row, end='')
try :
v_value = float(row.split(',')[1])
except IndexError:
break
# estimate can bit signal
if v_value >= threshold :
#print(v_value)
SOF = True
can_dom_bit_idx += 2
elif SOF == True and v_value < threshold :
can_res_bit_idx += 2
if can_dom_bit_idx == 1000:
can_signal.append('0')
elif can_res_bit_idx == 1000:
can_signal.append('1')
elif can_res_bit_idx >= 2000:
can_res_bit_idx = 0
elif can_dom_bit_idx >= 2000:
can_dom_bit_idx = 0
if idx % skip_duration != 0:
continue
negedge_q.put(v_value)
if negedge_q.qsize() > queue_length:
negedge_q.get()
# extract negedge edge
try :
if negedge_q.queue[0] - negedge_q.queue[-1] >= 0.5 and prev_can_signal_len != len(can_signal) :
NEGEDGE = True
#print(len(can_signal), negedge_q.queue[0], negedge_q.queue[-1])
prev_can_signal_len = len(can_signal)
if NEGEDGE == True:
negedge_term += 1
if negedge_term >= buffering_term and NEGEDGE == True:
for q_item in negedge_q.queue:
negedge_list.append(q_item)
#print("Negedge Edge: ", q_item, len(can_signal))
NEGEDGE = False
negedge_term = 0
negedge_q.empty()
#print("=================================")
except IndexError :
continue
# feature extraction
fft_negedge_list = abs(np.fft.fft(negedge_list))
print('{:.4f}'.format(mean(negedge_list)),\
'{:.4f}'.format(stdev(negedge_list)),\
'{:.4f}'.format(variance(negedge_list)),\
'{:.4f}'.format(skew(negedge_list)),\
'{:.4f}'.format(kurtosis(negedge_list)),\
'{:.4f}'.format(max(negedge_list)),\
'{:.4f}'.format(min(negedge_list)),\
'{:.4f}'.format(rms(negedge_list)),\
'{:.4f}'.format(en(negedge_list)),\
'{:.4f}'.format(mean(fft_negedge_list)),\
'{:.4f}'.format(stdev(fft_negedge_list)),\
'{:.4f}'.format(variance(fft_negedge_list)),\
'{:.4f}'.format(skew(fft_negedge_list)),\
'{:.4f}'.format(kurtosis(fft_negedge_list)),\
'{:.4f}'.format(max(fft_negedge_list)),\
'{:.4f}'.format(min(fft_negedge_list)),\
'{:.4f}'.format(rms(fft_negedge_list)),\
'{:.4f}'.format(en(fft_negedge_list))
)