-
Notifications
You must be signed in to change notification settings - Fork 0
/
first_try_amc_controller.py
83 lines (66 loc) · 2.88 KB
/
first_try_amc_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""
Embedded Python Blocks:
Each time this file is saved, GRC will instantiate the first class it finds
to get ports and parameters of your block. The arguments to __init__ will
be the parameters. All of them are required to have default values!
"""
import numpy as np
from gnuradio import gr
class blk(gr.sync_block): # other base classes are basic_block, decim_block, interp_block
"""Embedded Python Block - This block takes the packet numbers of the received window size packets and calculates the quantity of the lost packets
As for today, it only works for window sizes multiples of the length of the data.
"""
def __init__(self, window_size=1, modulus=1,average_length=10, reset_call = False): # only default arguments here
"""arguments to this function show up as parameters in GRC"""
gr.sync_block.__init__(
self,
name='AMC Controller', # will show up in GRC
in_sig=[np.int8],
out_sig=[np.float32,np.int8]
)
# if an attribute with the same name as a parameter is found,
# a callback is registered (properties work, too).
self.window_size = window_size
self.average_length = average_length
self.modulus = modulus
self.history = np.zeros(self.average_length)
self.reset_call = reset_call
self.state = 0 # 0 = BPSK 1 = QPSK 2 = 8PSK
self.set_history(window_size)
def calc_average_per(self,new_per):
self.history = np.roll(self.history,1)
self.history[0] = new_per
average_per = np.mean(self.history)
if self.state == 0:
if average_per <=0.35:
self.state = 1
elif self.stae == 1:
if self.reset_call:
self.state = 0
else:
if average_per <=0.35
self.state = 2
else:
self.state = 0
else:
if self.reset_call:
self.state = 0
else:
if average_per >= 0.55
self.state = 1
# print("History {} , Average {}".format(self.history,average_per))
return average_per
def work(self, input_items, output_items):
consumed = (len(input_items[0])-self.window_size+1)
if consumed < self.window_size:
return 0
for i in range(0 , consumed):
# if self.window_size+i < len(input_items[0]):
observed = input_items[0][i:self.window_size+i] %256
errors = sum((np.diff(observed) % self.modulus)-1)
per = max((float(errors)/(self.window_size+errors)),0)
# print("Observed {}".format(observed))
# print("PER {}".format(per))
output_items[0][i] = self.calc_average_per(per)
output_items[1][i] = self.state
return len(output_items[0])