-
-
Notifications
You must be signed in to change notification settings - Fork 222
/
listens_dispatcher.py
97 lines (83 loc) · 4.07 KB
/
listens_dispatcher.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import json
import time
from kombu.mixins import ConsumerMixin
from listenbrainz.listen import Listen, NowPlayingListen
from listenbrainz.utils import get_fallback_connection_name
from kombu import Connection, Exchange, Queue, Consumer
class ListensDispatcher(ConsumerMixin):
def __init__(self, app, socketio):
self.app = app
self.socketio = socketio
self.connection = None
# there are two consumers, so we need two channels: one for playing now queue and another
# for normal listens queue. when using ConsumerMixin, it sets up a default channel itself.
# we create the other channel here. we also need to handle its cleanup later
self.playing_now_channel = None
self.instant_stats_channel = None
self.unique_exchange = Exchange(app.config["UNIQUE_EXCHANGE"], "fanout", durable=False)
self.playing_now_exchange = Exchange(app.config["PLAYING_NOW_EXCHANGE"], "fanout", durable=False)
self.instant_stats_exchange = Exchange(app.config["INSTANT_SPARK_RESULT_EXCHANGE"], "fanout", durable=True)
self.websockets_queue = Queue(app.config["WEBSOCKETS_QUEUE"], exchange=self.unique_exchange, durable=True)
self.playing_now_queue = Queue(app.config["PLAYING_NOW_QUEUE"], exchange=self.playing_now_exchange,
durable=True)
self.instant_stats_queue = Queue(app.config["INSTANT_SPARK_RESULT_WEBSOCKETS_QUEUE"], exchange=self.instant_stats_exchange,
durable=True)
def send_listens(self, event_name, message):
listens = json.loads(message.body)
for data in listens:
if event_name == "playing_now":
listen = NowPlayingListen(user_id=data["user_id"], user_name=data["user_name"], data=data["track_metadata"])
else:
listen = Listen.from_json(data)
self.socketio.emit(event_name, json.dumps(listen.to_api()), to=listen.user_name)
message.ack()
def send_instant_stats(self, message):
body = json.loads(message.body)
for user in body["data"]:
user["from_ts"] = body["from_ts"]
user["to_ts"] = body["to_ts"]
user["entity"] = body["entity"]
self.socketio.emit("instant_stats", json.dumps(user), to=f"{user['user_id']}/instant_stats")
def get_consumers(self, _, channel):
self.playing_now_channel = channel.connection.channel()
return [
Consumer(
channel,
queues=[self.websockets_queue],
on_message=lambda x: self.send_listens("listen", x)
),
Consumer(
self.playing_now_channel,
queues=[self.playing_now_queue],
on_message=lambda x: self.send_listens("playing_now", x)
),
Consumer(
self.instant_stats_channel,
queues=[self.instant_stats_queue],
on_message=lambda x: self.send_instant_stats(x)
)
]
def on_consume_end(self, connection, default_channel):
if self.playing_now_channel:
self.playing_now_channel.close()
def init_rabbitmq_connection(self):
self.connection = Connection(
hostname=self.app.config["RABBITMQ_HOST"],
userid=self.app.config["RABBITMQ_USERNAME"],
port=self.app.config["RABBITMQ_PORT"],
password=self.app.config["RABBITMQ_PASSWORD"],
virtual_host=self.app.config["RABBITMQ_VHOST"],
transport_options={"client_properties": {"connection_name": get_fallback_connection_name()}}
)
def start(self):
while True:
try:
self.app.logger.info("Starting player writer...")
self.init_rabbitmq_connection()
self.run()
except KeyboardInterrupt:
self.app.logger.error("Keyboard interrupt!")
break
except Exception:
self.app.logger.error("Error in PlayerWriter:", exc_info=True)
time.sleep(3)