Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions homeassistant/components/plex/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DEFAULT_SSL = False
DEFAULT_VERIFY_SSL = True

DEBOUNCE_TIMEOUT = 1
DISPATCHERS = "dispatchers"
PLATFORMS = frozenset(["media_player", "sensor"])
PLATFORMS_COMPLETED = "platforms_completed"
Expand Down
37 changes: 33 additions & 4 deletions homeassistant/components/plex/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Shared class to maintain Plex server instances."""
from functools import partial, wraps
import logging
import ssl
from urllib.parse import urlparse
Expand All @@ -12,13 +13,15 @@
from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.const import CONF_TOKEN, CONF_URL, CONF_VERIFY_SSL
from homeassistant.helpers.dispatcher import dispatcher_send
from homeassistant.helpers.event import async_call_later

from .const import (
CONF_CLIENT_IDENTIFIER,
CONF_IGNORE_NEW_SHARED_USERS,
CONF_MONITORED_USERS,
CONF_SERVER,
CONF_USE_EPISODE_ART,
DEBOUNCE_TIMEOUT,
DEFAULT_VERIFY_SSL,
PLEX_NEW_MP_SIGNAL,
PLEX_UPDATE_MEDIA_PLAYER_SIGNAL,
Expand All @@ -39,12 +42,37 @@
plexapi.X_PLEX_VERSION = X_PLEX_VERSION


def debounce(func):
"""Decorate function to debounce callbacks from Plex websocket."""

unsub = None

async def call_later_listener(self, _):
"""Handle call_later callback."""
nonlocal unsub
unsub = None
await self.hass.async_add_executor_job(func, self)

@wraps(func)
def wrapper(self):
"""Schedule async callback."""
nonlocal unsub
if unsub:
_LOGGER.debug("Throttling update of %s", self.friendly_name)
unsub() # pylint: disable=not-callable
Comment thread
MartinHjelmare marked this conversation as resolved.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsub functions should only be called from the same context as in which they are set. So if created with async_call_later, need to be called from inside the event loop.

Also, because this is running in a thread, you cannot guarantee that nothing else touches it. You should instead run this all inside the event loop.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to turn this the wrapper function into an async function.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really sorry about that, hopefully fixed in #33730.

unsub = async_call_later(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is calling an async method from a sync context.

self.hass, DEBOUNCE_TIMEOUT, partial(call_later_listener, self),
Comment thread
jjlawren marked this conversation as resolved.
)

return wrapper


class PlexServer:
"""Manages a single Plex server connection."""

def __init__(self, hass, server_config, known_server_id=None, options=None):
"""Initialize a Plex server instance."""
self._hass = hass
self.hass = hass
self._plex_server = None
self._known_clients = set()
self._known_idle = set()
Expand Down Expand Up @@ -150,12 +178,13 @@ def refresh_entity(self, machine_identifier, device, session):
unique_id = f"{self.machine_identifier}:{machine_identifier}"
_LOGGER.debug("Refreshing %s", unique_id)
dispatcher_send(
self._hass,
self.hass,
PLEX_UPDATE_MEDIA_PLAYER_SIGNAL.format(unique_id),
device,
session,
)

@debounce
def update_platforms(self):
"""Update the platform entities."""
_LOGGER.debug("Updating devices")
Expand Down Expand Up @@ -239,13 +268,13 @@ def update_platforms(self):

if new_entity_configs:
dispatcher_send(
self._hass,
self.hass,
PLEX_NEW_MP_SIGNAL.format(self.machine_identifier),
new_entity_configs,
)

dispatcher_send(
self._hass,
self.hass,
PLEX_UPDATE_SENSOR_SIGNAL.format(self.machine_identifier),
sessions,
)
Expand Down
20 changes: 20 additions & 0 deletions tests/components/plex/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Common fixtures and functions for Plex tests."""
from datetime import timedelta

from homeassistant.components.plex.const import (
DEBOUNCE_TIMEOUT,
PLEX_UPDATE_PLATFORMS_SIGNAL,
)
from homeassistant.helpers.dispatcher import async_dispatcher_send
import homeassistant.util.dt as dt_util

from tests.common import async_fire_time_changed


async def trigger_plex_update(hass, server_id):
"""Update Plex by sending signal and jumping ahead by debounce timeout."""
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
next_update = dt_util.utcnow() + timedelta(seconds=DEBOUNCE_TIMEOUT)
async_fire_time_changed(hass, next_update)
await hass.async_block_till_done()
6 changes: 2 additions & 4 deletions tests/components/plex/test_config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
CONF_USE_EPISODE_ART,
DOMAIN,
PLEX_SERVER_CONFIG,
PLEX_UPDATE_PLATFORMS_SIGNAL,
SERVERS,
)
from homeassistant.config_entries import ENTRY_STATE_LOADED
from homeassistant.const import CONF_HOST, CONF_PORT, CONF_TOKEN, CONF_URL
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.setup import async_setup_component

from .common import trigger_plex_update
from .const import DEFAULT_DATA, DEFAULT_OPTIONS, MOCK_SERVERS, MOCK_TOKEN
from .mock_classes import MockPlexAccount, MockPlexServer

Expand Down Expand Up @@ -416,8 +415,7 @@ async def test_option_flow_new_users_available(hass, caplog):

server_id = mock_plex_server.machineIdentifier

async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
await trigger_plex_update(hass, server_id)

monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users

Expand Down
36 changes: 18 additions & 18 deletions tests/components/plex/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
CONF_URL,
CONF_VERIFY_SSL,
)
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util

from .common import trigger_plex_update
from .const import DEFAULT_DATA, DEFAULT_OPTIONS, MOCK_SERVERS, MOCK_TOKEN
from .mock_classes import MockPlexAccount, MockPlexServer

Expand Down Expand Up @@ -74,7 +74,7 @@ async def test_setup_with_config(hass):
)


async def test_setup_with_config_entry(hass):
async def test_setup_with_config_entry(hass, caplog):
"""Test setup component with config."""

mock_plex_server = MockPlexServer()
Expand Down Expand Up @@ -109,30 +109,31 @@ async def test_setup_with_config_entry(hass):
hass.data[const.DOMAIN][const.PLATFORMS_COMPLETED][server_id] == const.PLATFORMS
)

async_dispatcher_send(hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
await trigger_plex_update(hass, server_id)

sensor = hass.states.get("sensor.plex_plex_server_1")
assert sensor.state == str(len(mock_plex_server.accounts))

async_dispatcher_send(hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
await trigger_plex_update(hass, server_id)

with patch.object(
mock_plex_server, "clients", side_effect=plexapi.exceptions.BadRequest
):
async_dispatcher_send(
hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id)
)
await hass.async_block_till_done()
) as patched_clients_bad_request:
await trigger_plex_update(hass, server_id)

assert patched_clients_bad_request.called
assert "Error requesting Plex client data from server" in caplog.text

with patch.object(
mock_plex_server, "clients", side_effect=requests.exceptions.RequestException
):
async_dispatcher_send(
hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id)
)
await hass.async_block_till_done()
) as patched_clients_requests_exception:
await trigger_plex_update(hass, server_id)

assert patched_clients_requests_exception.called
assert (
f"Could not connect to Plex server: {mock_plex_server.friendlyName}"
in caplog.text
)


async def test_set_config_entry_unique_id(hass):
Expand Down Expand Up @@ -294,8 +295,7 @@ async def test_setup_with_photo_session(hass):

server_id = mock_plex_server.machineIdentifier

async_dispatcher_send(hass, const.PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
await trigger_plex_update(hass, server_id)

media_player = hass.states.get("media_player.plex_product_title")
assert media_player.state == "idle"
Expand Down
55 changes: 46 additions & 9 deletions tests/components/plex/test_server.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
"""Tests for Plex server."""
import copy
from datetime import timedelta

from asynctest import patch

from homeassistant.components.media_player import DOMAIN as MP_DOMAIN
from homeassistant.components.plex.const import (
CONF_IGNORE_NEW_SHARED_USERS,
CONF_MONITORED_USERS,
DEBOUNCE_TIMEOUT,
DOMAIN,
PLEX_UPDATE_PLATFORMS_SIGNAL,
SERVERS,
)
from homeassistant.helpers.dispatcher import async_dispatcher_send
import homeassistant.util.dt as dt_util

from .common import trigger_plex_update
from .const import DEFAULT_DATA, DEFAULT_OPTIONS
from .mock_classes import MockPlexServer

from tests.common import MockConfigEntry
from tests.common import MockConfigEntry, async_fire_time_changed


async def test_new_users_available(hass):
Expand Down Expand Up @@ -44,8 +48,7 @@ async def test_new_users_available(hass):

server_id = mock_plex_server.machineIdentifier

async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
await trigger_plex_update(hass, server_id)

monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users

Expand Down Expand Up @@ -83,8 +86,7 @@ async def test_new_ignored_users_available(hass, caplog):

server_id = mock_plex_server.machineIdentifier

async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
await trigger_plex_update(hass, server_id)

monitored_users = hass.data[DOMAIN][SERVERS][server_id].option_monitored_users

Expand Down Expand Up @@ -118,17 +120,52 @@ async def test_mark_sessions_idle(hass):

server_id = mock_plex_server.machineIdentifier

async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
await trigger_plex_update(hass, server_id)

sensor = hass.states.get("sensor.plex_plex_server_1")
assert sensor.state == str(len(mock_plex_server.accounts))

mock_plex_server.clear_clients()
mock_plex_server.clear_sessions()

async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
await trigger_plex_update(hass, server_id)

sensor = hass.states.get("sensor.plex_plex_server_1")
assert sensor.state == "0"


async def test_debouncer(hass, caplog):
"""Test debouncer decorator logic."""
entry = MockConfigEntry(
domain=DOMAIN,
data=DEFAULT_DATA,
options=DEFAULT_OPTIONS,
unique_id=DEFAULT_DATA["server_id"],
)

mock_plex_server = MockPlexServer(config_entry=entry)

with patch("plexapi.server.PlexServer", return_value=mock_plex_server), patch(
"homeassistant.components.plex.PlexWebsocket.listen"
):
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()

server_id = mock_plex_server.machineIdentifier

# First two updates are skipped
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()
async_dispatcher_send(hass, PLEX_UPDATE_PLATFORMS_SIGNAL.format(server_id))
await hass.async_block_till_done()

next_update = dt_util.utcnow() + timedelta(seconds=DEBOUNCE_TIMEOUT)
async_fire_time_changed(hass, next_update)
await hass.async_block_till_done()

assert (
caplog.text.count(f"Throttling update of {mock_plex_server.friendlyName}") == 2
)