-
Notifications
You must be signed in to change notification settings - Fork 4
/
bit_stuffing.py
56 lines (44 loc) · 2.68 KB
/
bit_stuffing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from nmigen import *
from lib.bus.stream.stream import PacketizedStream
from lib.data_structure.bundle import DOWNWARDS
class VariableWidthStream(PacketizedStream):
"""
A stream that can indicate that only n bits of the payload are relevant.
"""
def __init__(self, payload_shape, name=None, reset_width=0, src_loc_at=1):
super().__init__(payload_shape, name, src_loc_at=1 + src_loc_at)
self.current_width = Signal(range(len(self.payload)), reset=reset_width) @ DOWNWARDS
class BitStuffer(Elaboratable):
"""stuffs bits from a VariableWidthStream into a dense Stream"""
def __init__(self, input: VariableWidthStream, output_width):
self.input = input
self.output = PacketizedStream(output_width, name="bit_stuffer_output")
def elaborate(self, platform):
m = Module()
output_width = len(self.output.payload)
shift_register = Signal(len(self.input.payload) + output_width)
input_read = (self.input.ready & self.input.valid)
output_write = (self.output.ready & self.output.valid)
current_bits_in_shift_register = Signal(range(len(shift_register)))
flush = Signal()
with m.If(input_read & ~output_write):
with m.If(self.input.last):
m.d.sync += flush.eq(1)
m.d.sync += current_bits_in_shift_register.eq(current_bits_in_shift_register + self.input.current_width)
m.d.sync += shift_register.eq((self.input.payload << current_bits_in_shift_register) | shift_register)
with m.Elif(~input_read & output_write):
with m.If(flush & (current_bits_in_shift_register <= output_width)):
m.d.sync += flush.eq(0)
m.d.comb += self.output.last.eq(1)
m.d.sync += current_bits_in_shift_register.eq(0)
m.d.sync += current_bits_in_shift_register.eq(current_bits_in_shift_register - output_width)
m.d.sync += shift_register.eq(shift_register[output_width:])
with m.Elif(input_read & output_write):
with m.If(self.input.last):
m.d.sync += flush.eq(1)
m.d.sync += current_bits_in_shift_register.eq(current_bits_in_shift_register + self.input.current_width - output_width)
m.d.sync += shift_register.eq((self.input.payload << (current_bits_in_shift_register - output_width)) | (shift_register >> output_width))
m.d.comb += self.output.payload.eq(shift_register[:output_width])
m.d.comb += self.output.valid.eq((current_bits_in_shift_register >= output_width) | flush)
m.d.comb += self.input.ready.eq(((len(shift_register) - current_bits_in_shift_register) >= len(self.input.payload)) | ~flush)
return m