-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathadapters.py
57 lines (45 loc) · 2.37 KB
/
adapters.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from nmigen import *
from lib.bus.stream.stream import PacketizedStream
from lib.bus.stream.stream_transformer import StreamTransformer
from lib.data_structure.bundle import DOWNWARDS
from lib.peripherals.csr_bank import StatusSignal
from lib.video.image_stream import ImageStream
class ImageStream2PacketizedStream(Elaboratable):
"""Convert an ImageStream to a packetized Stream by producing one packet per frame"""
def __init__(self, input: ImageStream):
self.input = input
self.output = PacketizedStream(self.input.payload.shape(), name="packetized_image_stream")
self.additional_signal_names = [name for name in self.input.out_of_band_signals.keys() if name not in ["frame_last", "line_last"]]
for name in self.additional_signal_names:
setattr(self.output, name, Signal(name=name) @ DOWNWARDS)
def elaborate(self, platform):
m = Module()
with StreamTransformer(self.input, self.output, m):
m.d.comb += self.output.payload.eq(self.input.payload)
m.d.comb += self.output.last.eq(self.input.frame_last)
for name in self.additional_signal_names:
m.d.comb += getattr(self.output, name).eq(getattr(self.input, name))
return m
class PacketizedStream2ImageStream(Elaboratable):
"""Convert a Packetized stream to an Image stream by creating lines with `width`"""
def __init__(self, input: PacketizedStream, width):
self.input = input
self.width = width
self.not_exact_number_of_lines_error = StatusSignal(32)
self.output = ImageStream(input.payload.shape(), name="adapted_image_stream")
def elaborate(self, platform):
m = Module()
line_ctr = Signal(16)
with StreamTransformer(self.input, self.output, m):
with m.If(self.input.last):
m.d.comb += self.output.frame_last.eq(1)
m.d.sync += line_ctr.eq(0)
with m.If(line_ctr != self.width - 1):
m.d.sync += self.not_exact_number_of_lines_error.eq(self.not_exact_number_of_lines_error + 1)
with m.Elif(line_ctr < self.width):
m.d.sync += line_ctr.eq(line_ctr + 1)
with m.If(line_ctr == self.width - 1):
m.d.comb += self.output.line_last.eq(1)
with m.Else():
m.d.sync += line_ctr.eq(0)
return m