This repository was archived by the owner on Oct 16, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbuttplug_thread.py
48 lines (44 loc) · 1.51 KB
/
buttplug_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
import time
from queue import Queue, Empty
from threading import Thread
from buttplug.client import ButtplugClientDevice
from patterns import Stop
class Buttplug_Thread(Thread):
def __init__(self, queue: Queue, device: ButtplugClientDevice, queue2: Queue):
self.queue = queue
self.patterns: list = []
self.device = device
self.queue2 = queue2
super().__init__()
def run(self):
while True:
#print(self.patterns)
try:
pattern = self.queue.get_nowait()
#print("got", pattern)
if isinstance(pattern, Stop):
#print("Got stop pattern.")
self.patterns.clear()
self.queue2.put("stop")
else:
self.patterns.insert(0, pattern)
except Empty:
pass
if len(self.patterns) > 0:
speed = self.patterns[0].next()
if speed is None:
self.patterns.pop(0)
continue
self.queue2.put(speed)
else:
self.queue2.put("stop")
time.sleep(0.25)
#coroutine = self.true_function()
#loop = asyncio.new_event_loop()
#asyncio.set_event_loop(loop)
#loop2 = asyncio.get_event_loop()
#try:
# loop.run_until_complete(coroutine)
#except KeyboardInterrupt:
# print("BP THREAD: Received exit, exiting")