Skip to content

Commit

Permalink
Add alert processing thread (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
RemiRigal authored Jul 19, 2022
1 parent 8108dd8 commit a032e92
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 24 deletions.
1 change: 1 addition & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def start(self):
self.must_stop = True
self.alive = False
self.plex.save_cache()
self.plex.stop()
if not self.stop_signal:
sleep(1)
logger.info("Trying to restore the connection to the Plex server...")
Expand Down
4 changes: 4 additions & 0 deletions plex_auto_languages/alerts/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ class PlexAlert():
def __init__(self, message: dict):
self._message = message

@property
def message(self):
return self._message

def process(self, plex: PlexServer):
raise NotImplementedError
29 changes: 25 additions & 4 deletions plex_auto_languages/plex_alert_handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from queue import Queue, Empty
from threading import Thread, Event
from plex_auto_languages.alerts import PlexActivity, PlexTimeline, PlexPlaying, PlexStatus
from plex_auto_languages.utils.logger import get_logger

Expand All @@ -17,6 +19,15 @@ def __init__(self, plex: PlexServer, trigger_on_play: bool, trigger_on_scan: boo
self._trigger_on_play = trigger_on_play
self._trigger_on_scan = trigger_on_scan
self._trigger_on_activity = trigger_on_activity
self._alerts_queue = Queue()
self._stop_event = Event()
self._processor_thread = Thread(target=self._process_alerts)
self._processor_thread.setDaemon(True)
self._processor_thread.start()

def stop(self):
self._stop_event.set()
self._processor_thread.join()

def __call__(self, message: dict):
alert_class = None
Expand All @@ -39,8 +50,18 @@ def __call__(self, message: dict):

for alert_message in message[alert_field]:
alert = alert_class(alert_message)
self._alerts_queue.put(alert)

def _process_alerts(self):
logger.debug("Starting alert processing thread")
while not self._stop_event.is_set():
try:
alert.process(self._plex)
except Exception:
logger.exception(f"Unable to process {alert_class.TYPE}")
logger.debug(message)
alert = self._alerts_queue.get(True, 1)
try:
alert.process(self._plex)
except Exception:
logger.exception(f"Unable to process {alert.TYPE}")
logger.debug(alert.message)
except Empty:
pass
logger.debug("Stopping alert processing thread")
4 changes: 4 additions & 0 deletions plex_auto_languages/plex_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,7 @@ def start_deep_analysis(self):
continue
logger.info(f"[Scheduler] Processing updated episode {self.get_episode_short_name(item)}")
self.process_new_or_updated_episode(item.key, EventType.SCHEDULER, False)

def stop(self):
if self._alert_handler:
self._alert_handler.stop()
1 change: 1 addition & 0 deletions tests/test_alerts_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
def test_alert(plex):
alert = PlexAlert({})
assert alert._message == {}
assert alert.message == alert._message
with pytest.raises(NotImplementedError):
alert.process(plex)
66 changes: 46 additions & 20 deletions tests/test_plex_alert_handler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time
from queue import Queue
from logging import Logger
from unittest.mock import patch

Expand All @@ -9,46 +11,70 @@ def test_plex_alert_handler():
handler = PlexAlertHandler(None, True, True, True)

playing_alert = {"type": PlexPlaying.TYPE, "PlaySessionStateNotification": [{}]}
with patch.object(PlexPlaying, "process") as mocked_process:
with patch.object(Queue, "put") as mocked_put:
handler(playing_alert)
mocked_process.assert_called_once()
mocked_put.assert_called_once()

mocked_process.reset_mock()
mocked_put.reset_mock()
del playing_alert["PlaySessionStateNotification"]
handler(playing_alert)
mocked_process.assert_not_called()
mocked_put.assert_not_called()

timeline_alert = {"type": PlexTimeline.TYPE, "TimelineEntry": [{}]}
with patch.object(PlexTimeline, "process") as mocked_process:
with patch.object(Queue, "put") as mocked_put:
handler(timeline_alert)
mocked_process.assert_called_once()
mocked_put.assert_called_once()

mocked_process.reset_mock()
mocked_put.reset_mock()
del timeline_alert["TimelineEntry"]
handler(timeline_alert)
mocked_process.assert_not_called()
mocked_put.assert_not_called()

status_alert = {"type": PlexStatus.TYPE, "StatusNotification": [{}]}
with patch.object(PlexStatus, "process") as mocked_process:
with patch.object(Queue, "put") as mocked_put:
handler(status_alert)
mocked_process.assert_called_once()
mocked_put.assert_called_once()

mocked_process.reset_mock()
mocked_put.reset_mock()
del status_alert["StatusNotification"]
handler(status_alert)
mocked_process.assert_not_called()
mocked_put.assert_not_called()

activity_alert = {"type": PlexActivity.TYPE, "ActivityNotification": [{}]}
with patch.object(PlexActivity, "process") as mocked_process:
with patch.object(Queue, "put") as mocked_put:
handler(activity_alert)
mocked_process.assert_called_once()
mocked_put.assert_called_once()

mocked_process.reset_mock()
mocked_put.reset_mock()
del activity_alert["ActivityNotification"]
handler(activity_alert)
mocked_process.assert_not_called()
mocked_put.assert_not_called()

with patch.object(Logger, "debug") as mocked_debug:
status_alert = {"type": PlexStatus.TYPE, "StatusNotification": [{"title": "Library scan complete"}]}
handler(status_alert)
mocked_debug.assert_called_with(status_alert)
handler.stop()


def test_alert_processing():
handler = PlexAlertHandler(None, True, True, True)

status_alert = PlexStatus({"type": PlexStatus.TYPE, "StatusNotification": [{}]})
with patch.object(PlexStatus, "process") as mocked_process:
handler._alerts_queue.put(status_alert)
time.sleep(1)
mocked_process.assert_called_once()

status_alert = PlexStatus({"type": PlexStatus.TYPE, "StatusNotification": [{"title": "Library scan complete"}]})
with patch.object(PlexStatus, "process", side_effect=Exception()) as mocked_process:
with patch.object(Logger, "debug") as mocked_debug:
handler._alerts_queue.put(status_alert)
time.sleep(1)
mocked_debug.assert_called_with(status_alert.message)

handler.stop()


def test_processor_thread():
handler = PlexAlertHandler(None, True, True, True)
assert handler._processor_thread.is_alive() is True

handler.stop()
assert handler._processor_thread.is_alive() is False
1 change: 1 addition & 0 deletions tests/test_plex_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def test_start_alert_listener(plex):
assert plex.is_alive is True
plex._alert_listener.stop()
plex._alert_listener.join()
plex.stop()
time.sleep(1)
assert plex.is_alive is False

Expand Down

0 comments on commit a032e92

Please sign in to comment.