Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 148 additions & 95 deletions homeassistant/components/ring/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
from functools import partial
import logging
from pathlib import Path
from time import time
from typing import Optional

from oauthlib.oauth2 import AccessDeniedError
from ring_doorbell import Auth, Ring
import voluptuous as vol

from homeassistant import config_entries
from homeassistant.const import CONF_PASSWORD, CONF_USERNAME, __version__
from homeassistant.core import callback
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send, dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util.async_ import run_callback_threadsafe

Expand All @@ -24,16 +24,8 @@
NOTIFICATION_ID = "ring_notification"
NOTIFICATION_TITLE = "Ring Setup"

DATA_HISTORY = "ring_history"
DATA_HEALTH_DATA_TRACKER = "ring_health_data"
DATA_TRACK_INTERVAL = "ring_track_interval"

DOMAIN = "ring"
DEFAULT_ENTITY_NAMESPACE = "ring"
SIGNAL_UPDATE_RING = "ring_update"
SIGNAL_UPDATE_HEALTH_RING = "ring_health_update"

SCAN_INTERVAL = timedelta(seconds=10)

PLATFORMS = ("binary_sensor", "light", "sensor", "switch", "camera")

Expand Down Expand Up @@ -93,9 +85,36 @@ def token_updater(token):
auth = Auth(f"HomeAssistant/{__version__}", entry.data["token"], token_updater)
ring = Ring(auth)

await hass.async_add_executor_job(ring.update_data)
try:
await hass.async_add_executor_job(ring.update_data)
except AccessDeniedError:
_LOGGER.error("Access token is no longer valid. Please set up Ring again")
return False

hass.data.setdefault(DOMAIN, {})[entry.entry_id] = ring
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = {
"api": ring,
"devices": ring.devices(),
"device_data": GlobalDataUpdater(
hass, entry.entry_id, ring, "update_devices", timedelta(minutes=1)
),
"dings_data": GlobalDataUpdater(
hass, entry.entry_id, ring, "update_dings", timedelta(seconds=10)
),
"history_data": DeviceDataUpdater(
hass,
entry.entry_id,
ring,
lambda device: device.history(limit=10),
timedelta(minutes=1),
),
"health_data": DeviceDataUpdater(
hass,
entry.entry_id,
ring,
lambda device: device.update_health_data(),
timedelta(minutes=1),
),
}

for component in PLATFORMS:
hass.async_create_task(
Expand All @@ -105,25 +124,16 @@ def token_updater(token):
if hass.services.has_service(DOMAIN, "update"):
return True

async def refresh_all(_):
"""Refresh all ring accounts."""
await asyncio.gather(
*[
hass.async_add_executor_job(api.update_data)
for api in hass.data[DOMAIN].values()
]
)
async_dispatcher_send(hass, SIGNAL_UPDATE_RING)
async def async_refresh_all(_):
"""Refresh all ring data."""
for info in hass.data[DOMAIN].values():
await info["device_data"].async_refresh_all()
await info["dings_data"].async_refresh_all()
await hass.async_add_executor_job(info["history_data"].refresh_all)
await hass.async_add_executor_job(info["health_data"].refresh_all)

# register service
hass.services.async_register(DOMAIN, "update", refresh_all)

# register scan interval for ring
hass.data[DATA_TRACK_INTERVAL] = async_track_time_interval(
hass, refresh_all, SCAN_INTERVAL
)
hass.data[DATA_HEALTH_DATA_TRACKER] = HealthDataUpdater(hass)
hass.data[DATA_HISTORY] = HistoryCache(hass)
hass.services.async_register(DOMAIN, "update", async_refresh_all)

return True

Expand All @@ -146,98 +156,141 @@ async def async_unload_entry(hass, entry):
if len(hass.data[DOMAIN]) != 0:
return True

# Last entry unloaded, clean up
hass.data.pop(DATA_TRACK_INTERVAL)()
hass.data.pop(DATA_HEALTH_DATA_TRACKER)
hass.data.pop(DATA_HISTORY)
# Last entry unloaded, clean up service
hass.services.async_remove(DOMAIN, "update")

return True


class HealthDataUpdater:
"""Data storage for health data."""
class GlobalDataUpdater:
"""Data storage for single API endpoint."""

def __init__(self, hass):
"""Track devices that need health data updated."""
def __init__(
self,
hass: HomeAssistant,
config_entry_id: str,
ring: Ring,
update_method: str,
update_interval: timedelta,
):
"""Initialize global data updater."""
self.hass = hass
self.config_entry_id = config_entry_id
self.ring = ring
self.update_method = update_method
self.update_interval = update_interval
self.listeners = []
self._unsub_interval = None

@callback
def async_add_listener(self, update_callback):
"""Listen for data updates."""
# This is the first listener, set up interval.
if not self.listeners:
self._unsub_interval = async_track_time_interval(
self.hass, self.async_refresh_all, self.update_interval
)

self.listeners.append(update_callback)

@callback
def async_remove_listener(self, update_callback):
"""Remove data update."""
self.listeners.remove(update_callback)

if not self.listeners:
self._unsub_interval()
self._unsub_interval = None

async def async_refresh_all(self, _now: Optional[int] = None) -> None:
"""Time to update."""
if not self.listeners:
return

try:
await self.hass.async_add_executor_job(
getattr(self.ring, self.update_method)
)
except AccessDeniedError:
_LOGGER.error("Ring access token is no longer valid. Set up Ring again")
await self.hass.config_entries.async_unload(self.config_entry_id)
return

for update_callback in self.listeners:
update_callback()


class DeviceDataUpdater:
"""Data storage for device data."""

def __init__(
self,
hass: HomeAssistant,
config_entry_id: str,
ring: Ring,
update_method: str,
update_interval: timedelta,
):
"""Initialize device data updater."""
self.hass = hass
self.config_entry_id = config_entry_id
self.ring = ring
self.update_method = update_method
self.update_interval = update_interval
self.devices = {}
self._unsub_interval = None

async def track_device(self, config_entry_id, device):
async def async_track_device(self, device, update_callback):
"""Track a device."""
if not self.devices:
self._unsub_interval = async_track_time_interval(
self.hass, self.refresh_all, SCAN_INTERVAL
self.hass, self.refresh_all, self.update_interval
)

key = (config_entry_id, device.device_id)

if key not in self.devices:
self.devices[key] = {
if device.device_id not in self.devices:
self.devices[device.device_id] = {
"device": device,
"count": 1,
"update_callbacks": [update_callback],
"data": None,
}
# Store task so that other concurrent requests can wait for us to finish and
# data be available.
self.devices[device.device_id]["task"] = asyncio.current_task()
self.devices[device.device_id][
"data"
] = await self.hass.async_add_executor_job(self.update_method, device)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does tracking a device also need to update the data? Couldn't we just let the consumer get the data from the cache?

Why do we need to let the platforms track devices? We already know what devices there are. We could let the component control the device interface completely and just have platforms get data as needed from the cache.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an entity is disabled, we wouldn't want to fetch that data.

self.devices[device.device_id].pop("task")
else:
self.devices[key]["count"] += 1
self.devices[device.device_id]["update_callbacks"].append(update_callback)
# If someone is currently fetching data as part of the initialization, wait for them
if "task" in self.devices[device.device_id]:
await self.devices[device.device_id]["task"]

await self.hass.async_add_executor_job(device.update_health_data)
update_callback(self.devices[device.device_id]["data"])

@callback
def untrack_device(self, config_entry_id, device):
def async_untrack_device(self, device, update_callback):
"""Untrack a device."""
key = (config_entry_id, device.device_id)
self.devices[key]["count"] -= 1
self.devices[device.device_id]["update_callbacks"].remove(update_callback)

if self.devices[key]["count"] == 0:
self.devices.pop(key)
if not self.devices[device.device_id]["update_callbacks"]:
self.devices.pop(device.device_id)

if not self.devices:
self._unsub_interval()
self._unsub_interval = None

def refresh_all(self, _):
def refresh_all(self, _=None):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not an async interface? We seem to be calling this from async context and two out of three calls inside are async.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jumping into the executor 10 times inside a loop is a lot of overhead of jumping around between async and sync. It's in that case better to run in sync and just schedule the callbacks to be called.

"""Refresh all registered devices."""
for info in self.devices.values():
info["device"].update_health_data()

dispatcher_send(self.hass, SIGNAL_UPDATE_HEALTH_RING)


class HistoryCache:
"""Helper to fetch history."""

STALE_AFTER = 10 # seconds

def __init__(self, hass):
"""Initialize history cache."""
self.hass = hass
self.cache = {}

async def async_get_history(self, config_entry_id, device):
"""Get history of a device."""
key = (config_entry_id, device.device_id)

if key in self.cache:
info = self.cache[key]

# We're already fetching data, join that task
if "task" in info:
return await info["task"]

# We have valid cache info, return that
if time() - info["created_at"] < self.STALE_AFTER:
return info["data"]

self.cache.pop(key)

# Fetch data
task = self.hass.async_add_executor_job(partial(device.history, limit=10))

self.cache[key] = {"task": task}

data = await task

self.cache[key] = {"created_at": time(), "data": data}

return data
try:
data = info["data"] = self.update_method(info["device"])
except AccessDeniedError:
_LOGGER.error("Ring access token is no longer valid. Set up Ring again")
self.hass.add_job(
self.hass.config_entries.async_unload(self.config_entry_id)
)
return

for update_callback in info["update_callbacks"]:
self.hass.loop.call_soon_threadsafe(update_callback, data)
Loading