Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 123 additions & 49 deletions homeassistant/components/iqvia/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging

from pyiqvia import Client
from pyiqvia.errors import InvalidZipError
from pyiqvia.errors import InvalidZipError, IQVIAError
import voluptuous as vol

from homeassistant.config_entries import SOURCE_IMPORT
Expand All @@ -17,7 +17,6 @@
)
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util.decorator import Registry

from .config_flow import configured_instances
from .const import (
Expand All @@ -43,20 +42,20 @@

_LOGGER = logging.getLogger(__name__)

API_CATEGORY_MAPPING = {
TYPE_ALLERGY_TODAY: TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_ASTHMA_TODAY: TYPE_ASTHMA_INDEX,
TYPE_ASTHMA_TOMORROW: TYPE_ALLERGY_INDEX,
TYPE_DISEASE_TODAY: TYPE_DISEASE_INDEX,
}

DATA_CONFIG = "config"

DEFAULT_ATTRIBUTION = "Data provided by IQVIA™"
DEFAULT_SCAN_INTERVAL = timedelta(minutes=30)

FETCHER_MAPPING = {
(TYPE_ALLERGY_FORECAST,): (TYPE_ALLERGY_FORECAST, TYPE_ALLERGY_OUTLOOK),
(TYPE_ALLERGY_TODAY, TYPE_ALLERGY_TOMORROW): (TYPE_ALLERGY_INDEX,),
(TYPE_ASTHMA_FORECAST,): (TYPE_ASTHMA_FORECAST,),
(TYPE_ASTHMA_TODAY, TYPE_ASTHMA_TOMORROW): (TYPE_ASTHMA_INDEX,),
(TYPE_DISEASE_FORECAST,): (TYPE_DISEASE_FORECAST,),
(TYPE_DISEASE_TODAY,): (TYPE_DISEASE_INDEX,),
}

CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.All(
Expand All @@ -75,6 +74,12 @@
)


@callback
def async_get_api_category(sensor_type):
"""Return the API category that a particular sensor type should use."""
return API_CATEGORY_MAPPING.get(sensor_type, sensor_type)


async def async_setup(hass, config):
"""Set up the IQVIA component."""
hass.data[DOMAIN] = {}
Expand Down Expand Up @@ -102,8 +107,9 @@ async def async_setup_entry(hass, config_entry):
"""Set up IQVIA as config entry."""
websession = aiohttp_client.async_get_clientsession(hass)

iqvia = IQVIAData(hass, Client(config_entry.data[CONF_ZIP_CODE], websession))

try:
iqvia = IQVIAData(Client(config_entry.data[CONF_ZIP_CODE], websession))
await iqvia.async_update()
except InvalidZipError:
_LOGGER.error("Invalid ZIP code provided: %s", config_entry.data[CONF_ZIP_CODE])
Expand All @@ -115,16 +121,6 @@ async def async_setup_entry(hass, config_entry):
hass.config_entries.async_forward_entry_setup(config_entry, "sensor")
)

async def refresh(event_time):
"""Refresh IQVIA data."""
_LOGGER.debug("Updating IQVIA data")
await iqvia.async_update()
async_dispatcher_send(hass, TOPIC_DATA_UPDATE)

hass.data[DOMAIN][DATA_LISTENER][config_entry.entry_id] = async_track_time_interval(
hass, refresh, DEFAULT_SCAN_INTERVAL
)

return True


Expand All @@ -143,42 +139,99 @@ async def async_unload_entry(hass, config_entry):
class IQVIAData:
"""Define a data object to retrieve info from IQVIA."""

def __init__(self, client):
def __init__(self, hass, client):
"""Initialize."""
self._async_cancel_time_interval_listener = None
self._client = client
self._hass = hass
self.data = {}
self.zip_code = client.zip_code

self.fetchers = Registry()
self.fetchers.register(TYPE_ALLERGY_FORECAST)(self._client.allergens.extended)
self.fetchers.register(TYPE_ALLERGY_OUTLOOK)(self._client.allergens.outlook)
self.fetchers.register(TYPE_ALLERGY_INDEX)(self._client.allergens.current)
self.fetchers.register(TYPE_ASTHMA_FORECAST)(self._client.asthma.extended)
self.fetchers.register(TYPE_ASTHMA_INDEX)(self._client.asthma.current)
self.fetchers.register(TYPE_DISEASE_FORECAST)(self._client.disease.extended)
self.fetchers.register(TYPE_DISEASE_INDEX)(self._client.disease.current)
self._api_coros = {
TYPE_ALLERGY_FORECAST: client.allergens.extended,
TYPE_ALLERGY_INDEX: client.allergens.current,
TYPE_ALLERGY_OUTLOOK: client.allergens.outlook,
TYPE_ASTHMA_FORECAST: client.asthma.extended,
TYPE_ASTHMA_INDEX: client.asthma.current,
TYPE_DISEASE_FORECAST: client.disease.extended,
TYPE_DISEASE_INDEX: client.disease.current,
}
self._api_category_count = {
TYPE_ALLERGY_FORECAST: 0,
TYPE_ALLERGY_INDEX: 0,
TYPE_ALLERGY_OUTLOOK: 0,
TYPE_ASTHMA_FORECAST: 0,
TYPE_ASTHMA_INDEX: 0,
TYPE_DISEASE_FORECAST: 0,
TYPE_DISEASE_INDEX: 0,
}
self._api_category_locks = {
TYPE_ALLERGY_FORECAST: asyncio.Lock(),
TYPE_ALLERGY_INDEX: asyncio.Lock(),
TYPE_ALLERGY_OUTLOOK: asyncio.Lock(),
TYPE_ASTHMA_FORECAST: asyncio.Lock(),
TYPE_ASTHMA_INDEX: asyncio.Lock(),
TYPE_DISEASE_FORECAST: asyncio.Lock(),
TYPE_DISEASE_INDEX: asyncio.Lock(),
}

async def _async_get_data_from_api(self, api_category):
"""Update and save data for a particular API category."""
if self._api_category_count[api_category] == 0:
return

try:
self.data[api_category] = await self._api_coros[api_category]()
except IQVIAError as err:
_LOGGER.error("Unable to get %s data: %s", api_category, err)
self.data[api_category] = None

async def _async_update_listener_action(self, now):
"""Define an async_track_time_interval action to update data."""
await self.async_update()

@callback
def async_deregister_api_interest(self, sensor_type):
"""Decrement the number of entities with data needs from an API category."""
# If this deregistration should leave us with no registration at all, remove the
# time interval:
if sum(self._api_category_count.values()) == 0:
if self._async_cancel_time_interval_listener:
self._async_cancel_time_interval_listener()
self._async_cancel_time_interval_listener = None
return

api_category = async_get_api_category(sensor_type)
self._api_category_count[api_category] -= 1

async def async_register_api_interest(self, sensor_type):
"""Increment the number of entities with data needs from an API category."""
# If this is the first registration we have, start a time interval:
if not self._async_cancel_time_interval_listener:
self._async_cancel_time_interval_listener = async_track_time_interval(
self._hass, self._async_update_listener_action, DEFAULT_SCAN_INTERVAL,
)

api_category = async_get_api_category(sensor_type)
self._api_category_count[api_category] += 1

# If a sensor registers interest in a particular API call and the data doesn't
# exist for it yet, make the API call and grab the data:
async with self._api_category_locks[api_category]:
if api_category not in self.data:
await self._async_get_data_from_api(api_category)

async def async_update(self):
"""Update IQVIA data."""
tasks = {}

for conditions, fetcher_types in FETCHER_MAPPING.items():
if not any(c in SENSORS for c in conditions):
continue
tasks = [
self._async_get_data_from_api(api_category)
for api_category in self._api_coros
]

for fetcher_type in fetcher_types:
tasks[fetcher_type] = self.fetchers[fetcher_type]()
await asyncio.gather(*tasks)

results = await asyncio.gather(*tasks.values(), return_exceptions=True)

for key, result in zip(tasks, results):
if isinstance(result, Exception):
_LOGGER.error("Unable to get %s data: %s", key, result)
self.data[key] = {}
continue

_LOGGER.debug("Loaded new %s data", key)
self.data[key] = result
_LOGGER.debug("Received new data")
async_dispatcher_send(self._hass, TOPIC_DATA_UPDATE)


class IQVIAEntity(Entity):
Expand Down Expand Up @@ -245,13 +298,34 @@ async def async_added_to_hass(self):
@callback
def update():
"""Update the state."""
self.async_schedule_update_ha_state(True)
self.update_from_latest_data()
self.async_write_ha_state()

self._async_unsub_dispatcher_connect = async_dispatcher_connect(
self.hass, TOPIC_DATA_UPDATE, update
Comment thread
bachya marked this conversation as resolved.
)

await self._iqvia.async_register_api_interest(self._type)
if self._type == TYPE_ALLERGY_FORECAST:
# Entities that express interest in allergy forecast data should also
# express interest in allergy outlook data:
await self._iqvia.async_register_api_interest(TYPE_ALLERGY_OUTLOOK)

self.update_from_latest_data()

async def async_will_remove_from_hass(self):
"""Disconnect dispatcher listener when removed."""
if self._async_unsub_dispatcher_connect:
self._async_unsub_dispatcher_connect()
Comment thread
bachya marked this conversation as resolved.
self._async_unsub_dispatcher_connect = None

self._iqvia.async_deregister_api_interest(self._type)
if self._type == TYPE_ALLERGY_FORECAST:
# Entities that lose interest in allergy forecast data should also lose
# interest in allergy outlook data:
self._iqvia.async_deregister_api_interest(TYPE_ALLERGY_OUTLOOK)

@callback
def update_from_latest_data(self):
"""Update the entity's state."""
raise NotImplementedError()
2 changes: 1 addition & 1 deletion homeassistant/components/iqvia/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
TYPE_ALLERGY_FORECAST: ("Allergy Index: Forecasted Average", "mdi:flower"),
TYPE_ALLERGY_TODAY: ("Allergy Index: Today", "mdi:flower"),
TYPE_ALLERGY_TOMORROW: ("Allergy Index: Tomorrow", "mdi:flower"),
TYPE_ASTHMA_FORECAST: ("Asthma Index: Forecasted Average", "mdi:flower"),
TYPE_ASTHMA_TODAY: ("Asthma Index: Today", "mdi:flower"),
TYPE_ASTHMA_TOMORROW: ("Asthma Index: Tomorrow", "mdi:flower"),
TYPE_ASTHMA_FORECAST: ("Asthma Index: Forecasted Average", "mdi:flower"),
TYPE_DISEASE_FORECAST: ("Cold & Flu: Forecasted Average", "mdi:snowflake"),
TYPE_DISEASE_TODAY: ("Cold & Flu Index: Today", "mdi:pill"),
}
36 changes: 16 additions & 20 deletions homeassistant/components/iqvia/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from homeassistant.components.iqvia import (
DATA_CLIENT,
DOMAIN,
SENSORS,
TYPE_ALLERGY_FORECAST,
TYPE_ALLERGY_INDEX,
TYPE_ALLERGY_OUTLOOK,
Expand All @@ -23,6 +22,9 @@
IQVIAEntity,
)
from homeassistant.const import ATTR_STATE
from homeassistant.core import callback

from .const import SENSORS

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,13 +67,14 @@ async def async_setup_entry(hass, entry, async_add_entities):
TYPE_DISEASE_TODAY: IndexSensor,
}

sensors = []
for sensor_type in SENSORS:
klass = sensor_class_mapping[sensor_type]
name, icon = SENSORS[sensor_type]
sensors.append(klass(iqvia, sensor_type, name, icon, iqvia.zip_code))

async_add_entities(sensors, True)
async_add_entities(
[
sensor_class_mapping[sensor_type](
iqvia, sensor_type, name, icon, iqvia.zip_code
)
for sensor_type, (name, icon) in SENSORS.items()
]
)


def calculate_trend(indices):
Expand All @@ -93,9 +96,10 @@ def calculate_trend(indices):
class ForecastSensor(IQVIAEntity):
"""Define sensor related to forecast data."""

async def async_update(self):
@callback
def update_from_latest_data(self):
"""Update the sensor."""
if not self._iqvia.data:
if not self._iqvia.data.get(self._type):
return

data = self._iqvia.data[self._type].get("Location")
Expand Down Expand Up @@ -131,12 +135,10 @@ async def async_update(self):
class IndexSensor(IQVIAEntity):
"""Define sensor related to indices."""

async def async_update(self):
@callback
def update_from_latest_data(self):
"""Update the sensor."""
if not self._iqvia.data:
_LOGGER.warning(
"IQVIA didn't return data for %s; trying again later", self.name
)
return

try:
Expand All @@ -147,19 +149,13 @@ async def async_update(self):
elif self._type == TYPE_DISEASE_TODAY:
data = self._iqvia.data[TYPE_DISEASE_INDEX].get("Location")
except KeyError:
_LOGGER.warning(
"IQVIA didn't return data for %s; trying again later", self.name
)
return

key = self._type.split("_")[-1].title()

try:
[period] = [p for p in data["periods"] if p["Type"] == key]
except ValueError:
_LOGGER.warning(
"IQVIA didn't return data for %s; trying again later", self.name
)
return

[rating] = [
Expand Down