-
Notifications
You must be signed in to change notification settings - Fork 21
/
LM_neopixel.py
355 lines (313 loc) · 13.1 KB
/
LM_neopixel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
from neopixel import NeoPixel
from machine import Pin
from sys import platform
from utime import sleep_ms
from Common import transition_gen, micro_task
import uasyncio as asyncio
from microIO import resolve_pin, pinmap_search
from random import randint
from Types import resolve
#########################################
# DIGITAL CONTROLLER PARAMS #
#########################################
class Data:
# Values: R, G, B, STATE_ON_OFF, IS_INITIALIZED
DCACHE = [100, 100, 100, 0]
CH_MAX = 255
NEOPIXEL_OBJ = None
PERSISTENT_CACHE = False
RGB_TASK_TAG = "neopixel._tran"
TASK_STATE = False
#########################################
# DIGITAL rgb WITH 1 "PWM" #
#########################################
def __init_NEOPIXEL(n=24):
"""
Init NeoPixel module
n - number of led fragments
"""
if Data.NEOPIXEL_OBJ is None:
neopixel_pin = Pin(resolve_pin('neop')) # Get Neopixel pin from LED PIN pool
Data.NEOPIXEL_OBJ = NeoPixel(neopixel_pin, n) # initialize for max 8 segments
del neopixel_pin
return Data.NEOPIXEL_OBJ
def __persistent_cache_manager(mode):
"""
pds - persistent data structure
modes:
r - recover, s - save
"""
if not Data.PERSISTENT_CACHE:
return
if mode == 's':
# SAVE CACHE
with open('neopixel.pds', 'w') as f:
f.write(','.join([str(k) for k in Data.DCACHE]))
return
try:
# RESTORE CACHE
with open('neopixel.pds', 'r') as f:
Data.DCACHE = [float(data) for data in f.read().strip().split(',')]
except:
pass
def __state_machine(r, g, b):
# Set cache
if r > 0 or g > 0 or b > 0:
Data.DCACHE = [r, g, b, 1] # Cache colors + state (True-ON)
else:
Data.DCACHE[3] = 0 # State - False - OFF
__persistent_cache_manager('s') # Save cache - Data.DCACHE - to file
#########################################
# USER FUNCTIONS #
#########################################
def load(cache=None, ledcnt=24):
"""
Initiate NeoPixel RGB module
:param cache bool: file state machine cache: True/False/None(default: automatic True)
- Load .pds (state machine cache) for this load module
- Apply loaded states to gpio pins (boot function)
:return str: Cache state
"""
if cache is None:
Data.PERSISTENT_CACHE = False if platform == 'esp8266' else True
else:
Data.PERSISTENT_CACHE = cache
__persistent_cache_manager('r') # recover data cache
_ledcnt = __init_NEOPIXEL(n=ledcnt).n
if Data.PERSISTENT_CACHE and Data.DCACHE[3] == 1:
Data.DCACHE[3] = 0 # Force ON at boot
toggle(True)
return "CACHE: {}, LED CNT: {}".format(Data.PERSISTENT_CACHE, _ledcnt)
def color(r=None, g=None, b=None, smooth=True, force=True):
"""
Set NEOPIXEL RGB values
:param r: red value 0-255
:param g: green value 0-255
:param b: blue value 0-255
:param smooth: runs colors change with smooth effect
:param force: clean fade generators and set color
:return dict: rgb status - states: R, G, B, S
"""
def __buttery(r_from, g_from, b_from, r_to, g_to, b_to):
interval_sec = 0.2
if Data.DCACHE[3] == 0:
# Turn from OFF to on (to colors)
r_from, g_from, b_from = 0, 0, 0
Data.DCACHE[3] = 1
rgb_gen_obj, step_ms = transition_gen(r_from, r_to, g_from, g_to, b_from, b_to, interval_sec=interval_sec)
r_gen = rgb_gen_obj[0]
g_gen = rgb_gen_obj[1]
b_gen = rgb_gen_obj[2]
for _r in r_gen:
_g = next(g_gen)
_b = next(b_gen)
for lcnt in range(0, __init_NEOPIXEL().n):
Data.NEOPIXEL_OBJ[lcnt] = (_r, _g, _b)
Data.NEOPIXEL_OBJ.write()
sleep_ms(step_ms)
if force:
Data.TASK_STATE = False # STOP TRANSITION TASK, SOFT KILL - USER INPUT PRIO
r = Data.DCACHE[0] if r is None else r
g = Data.DCACHE[1] if g is None else g
b = Data.DCACHE[2] if b is None else b
# Set each LED for the same color
if smooth:
__buttery(r_from=Data.DCACHE[0], g_from=Data.DCACHE[1], b_from=Data.DCACHE[2], r_to=r, g_to=g, b_to=b)
else:
for element in range(0, __init_NEOPIXEL().n): # Iterate over led string elements
Data.NEOPIXEL_OBJ[element] = (r, g, b) # Set LED element color
Data.NEOPIXEL_OBJ.write() # Send data to device
# Set cache
__state_machine(r, g, b)
return status()
def brightness(percent=None, smooth=True, wake=True):
"""
Set neopixel brightness
:param percent: (int) brightness percentage: 0-100
:param smooth: (bool) enable smooth color transition: True(default)/False
:param wake: bool - wake up output / if off turn on with new brightness
:return dict: rgb status - states: R, G, B, S
"""
# Get color (channel) max brightness
ch_max = max(Data.DCACHE[:-1])
# Calculate actual brightness
actual_percent = ch_max / Data.CH_MAX * 100
# Return brightness percentage
if percent is None:
if Data.DCACHE[3] == 0:
return "0 %"
return "{} %".format(actual_percent)
# Validate input percentage value
if percent < 0 or percent > 100:
return "Percent is out of range: 0-100"
# Percent not changed
if percent == actual_percent and Data.DCACHE[3] == 1:
return status()
# Set brightness percentage
target_br = Data.CH_MAX * (percent / 100)
new_rgb = (target_br * float(Data.DCACHE[0] / ch_max),
target_br * float(Data.DCACHE[1] / ch_max),
target_br * float(Data.DCACHE[2]) / ch_max)
# Update RGB output
if Data.DCACHE[3] == 1 or wake:
return color(round(new_rgb[0], 3), round(new_rgb[1], 3), round(new_rgb[2], 3), smooth=smooth)
# Update cache only! Data.DCACHE[3] == 0 and wake == False
Data.DCACHE[0] = int(new_rgb[0])
Data.DCACHE[1] = int(new_rgb[1])
Data.DCACHE[2] = int(new_rgb[2])
return status()
def segment(r=None, g=None, b=None, s=0, cache=False, write=True):
"""
Set single segment by index on neopixel
:param r: red value 0-255
:param g: green value 0-255
:param b: blue value 0-255
:param s: segment - index 0-ledcnt
:param cache: cache color (update .pds file)
:param write: send color buffer to neopixel (update LEDs)
:return dict: rgb status - states: R, G, B, S
"""
r = Data.DCACHE[0] if r is None else r
g = Data.DCACHE[1] if g is None else g
b = Data.DCACHE[2] if b is None else b
neo_n = __init_NEOPIXEL().n
if s <= neo_n:
Data.NEOPIXEL_OBJ[s] = (r, g, b)
# Send colors to neopixel
if write:
Data.NEOPIXEL_OBJ.write()
# Cache handling
if cache:
if r > 0 or g > 0 or b > 0:
Data.DCACHE = [r, g, b, 1]
else:
Data.DCACHE[3] = 0
__persistent_cache_manager('s') # Save cache - Data.DCACHE - to file
return status()
return "NEOPIXEL index error: {} > {}".format(s, neo_n)
def toggle(state=None, smooth=True):
"""
Toggle led state based on the stored state
:param state: True(1)/False(0)
:param smooth: runs colors change with smooth effect
:return dict: rgb status - states: R, G, B, S
"""
# Set state directly (inverse) + check change
if state is not None:
if bool(state) is bool(Data.DCACHE[3]):
return status()
Data.DCACHE[3] = 0 if state else 1
# Set OFF state (1)
if Data.DCACHE[3] == 1:
return color(r=0, g=0, b=0, smooth=smooth, force=False)
# Turn ON with smooth "hack" (0)
if smooth:
r, g, b = Data.DCACHE[0], Data.DCACHE[1], Data.DCACHE[2]
Data.DCACHE[0], Data.DCACHE[1], Data.DCACHE[2] = 0, 0, 0
return color(r, g, b, smooth=smooth, force=False)
# Turn ON without smooth (0)
return color(smooth=smooth, force=False)
def transition(r=None, g=None, b=None, sec=1.0, wake=False):
"""
[TASK] Set transition color change for long dimming periods < 30sec
- creates the dimming generators
:param r: red channel 0-255
:param g: green channel 0-255
:param b: blue channel 0-255
:param sec: transition length in sec
:param wake: bool, wake on setup (auto run on periphery)
:return: info msg string
"""
async def _task(ms_period, iterable):
# [!] ASYNC TASK ADAPTER [*2] with automatic state management
# [micro_task->Task] TaskManager access to task internals (out, done attributes)
r_gen, g_gen, b_gen = iterable[0], iterable[1], iterable[2]
with micro_task(tag=Data.RGB_TASK_TAG) as my_task:
for r_val in r_gen:
if not Data.TASK_STATE:
my_task.out = "Cancelled"
return
g_val = next(g_gen)
b_val = next(b_gen)
if Data.DCACHE[3] == 1 or wake:
# Write periphery
for element in range(0, __init_NEOPIXEL().n): # Iterate over led string elements
Data.NEOPIXEL_OBJ[element] = (r_val, g_val, b_val) # Set LED element color
Data.NEOPIXEL_OBJ.write() # Send data to device
# Update periphery cache (value check due to toggle ON value minimum)
Data.DCACHE[0] = r_val if r_val > 5 else 5 # SAVE VALUE TO CACHE > 5 ! because toggle
Data.DCACHE[1] = g_val if g_val > 5 else 5 # SAVE VALUE TO CACHE > 5 ! because toggle
Data.DCACHE[2] = b_val if b_val > 5 else 5 # SAVE VALUE TO CACHE > 5 ! because toggle
my_task.out = f"Dimming: R:{r_val} G:{g_val} B:{b_val}"
await asyncio.sleep_ms(ms_period)
if Data.DCACHE[3] == 1 or wake:
__state_machine(r=r_val, g=g_val, b=b_val)
my_task.out = f"Dimming DONE: R:{r_val} G:{g_val} B:{b_val}"
Data.TASK_STATE = True # Save transition task is stared (kill param to overwrite task with user input)
r_from, g_from, b_from = Data.DCACHE[0], Data.DCACHE[1], Data.DCACHE[2]
r_to = Data.DCACHE[0] if r is None else r
g_to = Data.DCACHE[1] if g is None else g
b_to = Data.DCACHE[2] if b is None else b
# Create transition generator and calculate step_ms
rgb_gen, step_ms = transition_gen(r_from, r_to, g_from, g_to, b_from, b_to, interval_sec=sec)
# [!] ASYNC TASK CREATION [1*] with async task callback + taskID (TAG) handling
state = micro_task(tag=Data.RGB_TASK_TAG, task=_task(ms_period=step_ms, iterable=rgb_gen))
return "Starting transition" if state else "Transition already running"
def random(smooth=True, max_val=255):
"""
Demo function: implements random color change
:param smooth: (bool) enable smooth color transition: True(default)/False
:param max_val: set channel maximum generated value: 0-1000
:return dict: rgb status - states: R, G, B, S
"""
r = randint(0, max_val)
g = randint(0, max_val)
b = randint(0, max_val)
return color(r, g, b, smooth=smooth)
def subscribe_presence():
"""
Initialize LM presence module with ON/OFF callback functions
:return: None
"""
from LM_presence import _subscribe
_subscribe(on=lambda s=True: toggle(s), off=lambda s=False: toggle(s))
#######################
# LM helper functions #
#######################
def status(lmf=None):
"""
[i] micrOS LM naming convention
Show Load Module state machine
:param lmf str: selected load module function aka (function to show state of): None (show all states)
- micrOS client state synchronization
:return dict: R, G, B, S
"""
# Neopixel(=RGB) dedicated widget input - [OK]
data = Data.DCACHE
return {'R': data[0], 'G': data[1], 'B': data[2], 'S': data[3]}
def pinmap():
"""
[i] micrOS LM naming convention
Shows logical pins - pin number(s) used by this Load module
- info which pins to use for this application
:return dict: pin name (str) - pin value (int) pairs
"""
return pinmap_search('neop')
def help(widgets=False):
"""
[i] micrOS LM naming convention - built-in help message
:return tuple:
(widgets=False) list of functions implemented by this application
(widgets=True) list of widget json for UI generation
"""
return resolve(('COLOR color r=<0-255-5> g b smooth=True force=True',
'BUTTON toggle state=<True,False> smooth=True',
'load ledcnt=24',
'SLIDER brightness percent=<0-100> smooth=True wake=True',
'COLOR segment r g b s=<0-n>',
'transition r g b sec=1.0 wake=False',
'BUTTON random smooth=True max_val=254',
'status',
'subscribe_presence',
'pinmap',
'help widgets=False'), widgets=widgets)