-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqpsk_tx.py
154 lines (124 loc) · 5.18 KB
/
qpsk_tx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from pynq import DefaultIP # 提供dma和mmio
from pynq import DefaultHierarchy # 提供多IP分层驱动
from pynq import allocate # 提供内存分配
import numpy as np
# 设计pynq 技术方法来源于
# https://github.com/Xilinx/PYNQ/blob/master/docs/source/overlay_design_methodology/overlay_tutorial.ipynb
class QPSKTx(DefaultHierarchy):
def __init__(self, description, pkt_sym=16, pkt_time=128, pkt_fft=1024):
"""Driver for our QPSK TX IP hierarchy
This encompasses the qpsk tx logic and the DMAs for data
transfer of exposed signals.
"""
super().__init__(description)
self.buf_fft = allocate(shape=(pkt_fft, ), dtype=np.uint32)
self.buf_sym = allocate(shape=(pkt_sym, ), dtype=np.uint8)
self.buf_time = allocate(shape=(pkt_time * 2, ), dtype=np.int16)
# QPSK IP General Config
self.qpsk_tx.lfsr_rst = 1
self.qpsk_tx.enable = 1
self.qpsk_tx.packetsize_rf = 1024
self.set_gain(1)
self.qpsk_tx.lfsr_rst = 0
# QPSK IP Symbol Config
self.qpsk_tx.reset_symbol = 1
self.qpsk_tx.packetsize_symbol = pkt_sym - 1
self.qpsk_tx.reset_symbol = 0
self.qpsk_tx.autorestart_symbol = 0
# QPSK IP FFT Config
self.qpsk_tx.reset_fft = 1
self.qpsk_tx.packetsize_fft = pkt_fft - 1
self.qpsk_tx.reset_fft = 0
self.qpsk_tx.autorestart_fft = 0
## QPSK IP Time Config
self.qpsk_tx.reset_time = 1
self.qpsk_tx.packetsize_time = pkt_time - 1
self.qpsk_tx.reset_time = 0
self.qpsk_tx.autorestart_time = 0
def set_gain(self, normalized_gain):
scaling_factor = 0.65
gain = np.uint32( round(normalized_gain * scaling_factor * (2**32 - 1)) )
self.qpsk_tx.mmio.array[44>>2] = gain
def get_shaped_fft(self):
"""Get a single buffer of FFT data from the pulse shaped signal
"""
self.qpsk_tx.transfer_fft = 1
self.dma_tx_fft.recvchannel.transfer(self.buf_fft)
self.dma_tx_fft.recvchannel.wait()
self.qpsk_tx.transfer_fft = 0
return np.array(self.buf_fft)
def get_shaped_time(self):
"""Get a single buffer of time domain data from the pulse shaped signal
"""
self.qpsk_tx.transfer_time = 1
self.dma_tx_time.recvchannel.transfer(self.buf_time)
self.dma_tx_time.recvchannel.wait()
self.qpsk_tx.transfer_time = 0
t_data = np.array(self.buf_time)
c_data = t_data[::2] + 1j * t_data[1::2]
return c_data
def get_many_shaped_time(self, N=10):
"""Get N buffers of time domain data from the pulse shaped signal
"""
return np.concatenate([self.get_shaped_time() for i in range(N)])
def get_symbols(self):
"""Get a single buffer of raw QPSK symbols
"""
def raw_to_i(raw):
i = raw & 0b0011
if i == 3:
return -1
else:
return 1
def raw_to_q(raw):
i = (raw & 0b1100) >> 2
if i == 3:
return -1
else:
return 1
self.qpsk_tx.transfer_symbol = 1
self.dma_tx_symbol.recvchannel.transfer(self.buf_sym)
self.dma_tx_symbol.recvchannel.wait()
self.qpsk_tx.transfer_symbol = 0
raw_data = np.array(self.buf_sym)
c_data = np.array([raw_to_i(e) + 1j * raw_to_q(e) for e in raw_data])
return c_data
def get_many_symbols(self, N=10):
"""Get N buffers of raw QPSK symbols
"""
return np.concatenate([self.get_symbols() for i in range(N)])
@staticmethod
def checkhierarchy(description):
if 'dma_tx_fft' in description['ip'] \
and 'dma_tx_time' in description['ip'] \
and 'dma_tx_symbol' in description['ip'] \
and 'qpsk_tx' in description['ip']:
return True
return False
class QPSKTxCore(DefaultIP):
"""Driver for QPSK TX's core logic IP
TxCore直接使用mmio发送动作指令
Exposes all the configuration registers by name via data-driven properties
"""
def __init__(self, description):
super().__init__(description=description)
# 包装IP的驱动
bindto = ['UoS:RFSoC:axi_qpsk_tx:5.3']
# LUT of property addresses for our data-driven properties,来源于axi_qpsk_hw.h
_qpsk_props = [("transfer_symbol", 0), ("transfer_fft", 4),
("transfer_time", 60), ("reset_symbol", 8), ("reset_fft", 12),
("reset_time", 48), ("packetsize_symbol", 16),
("packetsize_rf", 20), ("packetsize_fft", 24),
("packetsize_time", 52), ("autorestart_symbol", 36),
("autorestart_fft", 40), ("autorestart_time", 56),
("lfsr_rst", 28), ("enable", 32), ("output_gain", 44)]
# Func to return a MMIO getter and setter based on a relative addr
def _create_mmio_property(addr):
def _get(self):
return self.read(addr)
def _set(self, value):
self.write(addr, value)
return property(_get, _set)
# Generate getters and setters based on _qpsk_props
for (name, addr) in _qpsk_props:
setattr(QPSKTxCore, name, _create_mmio_property(addr))