-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathMenorahOperations.py
160 lines (135 loc) · 5.38 KB
/
MenorahOperations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import threading
from enum import Enum
import time
from flask_socketio import SocketIO
import random
# Import the GPIO library only if not in local development environment
if not os.getenv('LOCAL_DEV'):
import RPi.GPIO as GPIO
class Mode(Enum):
Interactive =0
Strobe = 1
Wave = 2
Random = 3
class MenorahOperations:
def __init__(self, socket: SocketIO):
self.local_dev = bool(os.getenv('LOCAL_DEV'))
self.socket = socket
# Assuming you have a mapping of candle numbers to GPIO pins
self.pin_map = {1: 16, 2: 11, 3: 13, 4: 22, 5: 18, 6: 36, 7: 12, 8: 15, 9: 29}
if not self.local_dev:
GPIO.setmode(GPIO.BOARD)
for pin in self.pin_map.values():
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, GPIO.HIGH)
self.states = [True] * 9 # Initial states for 9 candles
self.mode = Mode.Interactive
self.strobe_delay = 0.5
self.strobe_running = False
self.strobe_thread = None
# Wave effect specific variables
self.wave_running = False
self.wave_thread = None
self.wave_delay = 0.07 # Adjust the speed of the wave here
self.random_running = False
self.random_thread = None
self.random_delay=0.07
def set_mode(self, mode: Mode):
if mode != self.mode:
cleanup = {
Mode.Interactive: None,
Mode.Strobe: self.stop_strobe,
Mode.Wave: self.stop_wave,
Mode.Random: self.stop_random
}
startup = {
Mode.Interactive: None,
Mode.Strobe: self.start_strobe,
Mode.Wave: self.start_wave,
Mode.Random: self.start_random
}
if cleanup[self.mode]:
cleanup[self.mode]()
self.mode = mode
if startup[self.mode]:
startup[self.mode]()
def set_candle_state(self, candle_number: int, state: bool, emit_socket=True):
if not self.local_dev:
pin = self.pin_map.get(candle_number)
if pin is not None:
GPIO.output(pin, GPIO.HIGH if state else GPIO.LOW)
self.states[candle_number - 1] = state
if emit_socket:
self.emit_socket()
def emit_socket(self):
self.socket.emit('candle_states', self.get_candles_states(), room=None)
def set_all_candles(self, state: bool=True):
if not self.local_dev:
for pin in self.pin_map.values():
GPIO.output(pin, GPIO.HIGH if state else GPIO.LOW)
self.states = [state] * 9
def _reset_candles(self):
if not self.local_dev:
for pin in self.pin_map.values():
GPIO.output(pin, GPIO.LOW)
self.states = [False] * 9
def get_candles_states(self):
return self.states
def start_strobe(self, delay=0.3):
self.strobe_delay = max(0.1, delay)
if not self.strobe_running:
self.strobe_running = True
self.strobe_thread = threading.Thread(target=self._strobe_effect, daemon=True)
self.strobe_thread.start()
def stop_strobe(self):
self.strobe_running = False
if self.strobe_thread:
self.strobe_thread.join(timeout=1.0) # Timeout to avoid indefinite waiting
self.strobe_thread = None
def _strobe_effect(self):
while self.strobe_running:
for candle_num in self.pin_map:
self.set_candle_state(candle_num, True, emit_socket=False)
self.emit_socket()
time.sleep(self.strobe_delay) # Strobe speed
for candle_num in self.pin_map:
self.set_candle_state(candle_num, False, emit_socket=False)
self.emit_socket()
time.sleep(self.strobe_delay*0.2)
def start_wave(self):
if not self.wave_running:
self.wave_running = True
self.wave_thread = threading.Thread(target=self._wave_effect, daemon=True)
self.wave_thread.start()
def stop_wave(self):
self.wave_running = False
if self.wave_thread:
self.wave_thread.join(timeout=1.0)
self.wave_thread = None
def _wave_effect(self):
while self.wave_running:
for candle_num in self.pin_map:
self.set_candle_state(candle_num, True)
time.sleep(self.wave_delay)
self.set_candle_state(candle_num, False)
def start_random(self):
if not self.random_running:
self.random_running = True
self.random_thread = threading.Thread(target=self._random_effect, daemon=True)
self.random_thread.start()
def stop_random(self):
self.random_running = False
if self.random_thread:
self.random_thread.join(timeout=1.0)
self.random_thread = None
def _random_effect(self):
while self.random_running:
# Randomly select a candle number
candle_num = random.choice(list(self.pin_map.keys()))
# Toggle the selected candle
self.set_candle_state(candle_num, True)
time.sleep(self.random_delay) # Control the speed of toggling
self.set_candle_state(candle_num, False)
# Wait a bit before toggling the next random candle
time.sleep(self.random_delay)