-
Notifications
You must be signed in to change notification settings - Fork 0
/
rp2_rmt.py
107 lines (95 loc) · 3.38 KB
/
rp2_rmt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# rp2_rmt.py A RMT-like class for the RP2.
# Released under the MIT License (MIT). See LICENSE.
# Copyright (c) 2021 Peter Hinch
from machine import Pin, PWM
import rp2
@rp2.asm_pio(set_init=rp2.PIO.OUT_LOW, autopull=True, pull_thresh=32)
def pulsetrain():
wrap_target()
out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
irq(rel(0))
set(pins, 1) # Set pin high
label('loop')
jmp(x_dec,'loop')
irq(rel(0))
set(pins, 0) # Set pin low
out(y, 32) # Low time.
label('loop_lo')
jmp(y_dec,'loop_lo')
wrap()
@rp2.asm_pio(autopull=True, pull_thresh=32)
def irqtrain():
wrap_target()
out(x, 32) # No of 1MHz ticks. Block if FIFO MT at end.
irq(rel(0))
label('loop')
jmp(x_dec,'loop')
wrap()
class DummyPWM:
def duty_u16(self, _):
pass
class RP2_RMT:
def __init__(self, pin_pulse=None, carrier=None, sm_no=0, sm_freq=1_000_000):
if carrier is None:
self.pwm = DummyPWM()
self.duty = (0, 0)
else:
pin_car, freq, duty = carrier
self.pwm = PWM(pin_car) # Set up PWM with carrier off.
self.pwm.freq(freq)
self.pwm.duty_u16(0)
self.duty = (int(0xffff * duty // 100), 0)
if pin_pulse is None:
self.sm = rp2.StateMachine(sm_no, irqtrain, freq=sm_freq)
else:
self.sm = rp2.StateMachine(sm_no, pulsetrain, freq=sm_freq, set_base=pin_pulse)
self.apt = 0 # Array index
self.arr = None # Array
self.ict = None # Current IRQ count
self.icm = 0 # End IRQ count
self.reps = 0 # 0 == forever n == no. of reps
rp2.PIO(0).irq(self._cb)
# IRQ callback. Because of FIFO IRQ's keep arriving after STOP.
def _cb(self, pio):
self.pwm.duty_u16(self.duty[self.ict & 1])
self.ict += 1
if d := self.arr[self.apt]: # If data available feed FIFO
self.sm.put(d)
self.apt += 1
else:
if r := self.reps != 1: # All done if reps == 1
if r: # 0 == run forever
self.reps -= 1
self.sm.put(self.arr[0])
self.apt = 1 # Set pointer and count to state
self.ict = 1 # after 1st IRQ
# Arg is an array of times in μs terminated by 0.
def send(self, ar, reps=1, check=True):
self.sm.active(0)
self.reps = reps
ar[-1] = 0 # Ensure at least one STOP
for x, d in enumerate(ar): # Find 1st STOP
if d == 0:
break
if check:
# Pulse train must end with a space otherwise we leave carrier on.
# So, if it ends with a mark, append a space. Note __init__.py
# ensures that there is room in array.
if (x & 1):
ar[x] = 1 # space. Duration doesn't matter.
x += 1
ar[x] = 0 # STOP
self.icm = x # index of 1st STOP
mv = memoryview(ar)
n = min(x, 4) # Fill FIFO if there are enough data points.
self.sm.put(mv[0 : n])
self.arr = ar # Initial conditions for ISR
self.apt = n # Point to next data value
self.ict = 0 # IRQ count
self.sm.active(1)
def busy(self):
if self.ict is None:
return False # Just instantiated
return self.ict < self.icm
def cancel(self):
self.reps = 1