From 064184762851523c8fb9c0d99983972fb421e734 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 25 Mar 2021 22:34:45 -1000 Subject: [PATCH 1/3] Optimize storage collection entity operations with asyncio.gather We can group like operations which speeds up startup by adding entities concurrently --- homeassistant/helpers/collection.py | 75 ++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 18 deletions(-) diff --git a/homeassistant/helpers/collection.py b/homeassistant/helpers/collection.py index 0c74ac413e7e46..fac0fe7f7a687c 100644 --- a/homeassistant/helpers/collection.py +++ b/homeassistant/helpers/collection.py @@ -55,6 +55,8 @@ class CollectionChangeSet: Awaitable[None], ] +ChangeSetListener = Callable[[Iterable[CollectionChangeSet]], Awaitable[None]] + class CollectionError(HomeAssistantError): """Base class for collection related errors.""" @@ -106,6 +108,7 @@ def __init__(self, logger: logging.Logger, id_manager: IDManager | None = None): self.id_manager = id_manager or IDManager() self.data: dict[str, dict] = {} self.listeners: list[ChangeListener] = [] + self.change_set_listeners: list[ChangeSetListener] = [] self.id_manager.add_collection(self.data) @@ -122,6 +125,14 @@ def async_add_listener(self, listener: ChangeListener) -> None: """ self.listeners.append(listener) + @callback + def async_add_change_set_listener(self, listener: ChangeSetListener) -> None: + """Add a listener for a full change set. + + Will be called with [(change_type, item_id, updated_config), ...] + """ + self.change_set_listeners.append(listener) + async def notify_changes(self, change_sets: Iterable[CollectionChangeSet]) -> None: """Notify listeners of a change.""" await asyncio.gather( @@ -129,7 +140,11 @@ async def notify_changes(self, change_sets: Iterable[CollectionChangeSet]) -> No listener(change_set.change_type, change_set.item_id, change_set.item) for listener in self.listeners for change_set in change_sets - ] + ], + *[ + change_set_listener(change_sets) + for change_set_listener in self.change_set_listeners + ], ) @@ -312,29 +327,53 @@ def sync_entity_lifecycle( ) -> None: """Map a collection to an entity component.""" entities = {} + ent_reg = entity_registry.async_get(hass) - async def _collection_changed(change_type: str, item_id: str, config: dict) -> None: + async def _collection_changed(change_sets: Iterable[CollectionChangeSet]) -> None: """Handle a collection change.""" - if change_type == CHANGE_ADDED: - entity = create_entity(config) - await entity_component.async_add_entities([entity]) - entities[item_id] = entity - return - - if change_type == CHANGE_REMOVED: - ent_reg = await entity_registry.async_get_registry(hass) - ent_to_remove = ent_reg.async_get_entity_id(domain, platform, item_id) + + async def _add_entity(change_set: CollectionChangeSet) -> Entity: + entities[change_set.item_id] = create_entity(change_set.item) + return entities[change_set.item_id] + + async def _remove_entity(change_set: CollectionChangeSet) -> None: + ent_to_remove = ent_reg.async_get_entity_id( + domain, platform, change_set.item_id + ) if ent_to_remove is not None: ent_reg.async_remove(ent_to_remove) else: - await entities[item_id].async_remove(force_remove=True) - entities.pop(item_id) - return - - # CHANGE_UPDATED - await entities[item_id].async_update_config(config) # type: ignore + await entities[change_set.item_id].async_remove(force_remove=True) + entities.pop(change_set.item_id) + + async def _update_entity(change_set: CollectionChangeSet) -> None: + await entities[change_set.item_id].async_update_config(change_set.item) # type: ignore + + _func_map = { + CHANGE_ADDED: _add_entity, + CHANGE_REMOVED: _remove_entity, + CHANGE_UPDATED: _update_entity, + } + # Create a new bucket every time we have a different change type + # to ensure operations happen in order. We only group + # the same change type. + task_buckets: list = [] + previous_change_type = None + for change_set in change_sets: + if previous_change_type != change_set.change_type: + task_buckets.append([]) + task_buckets[-1].append(_func_map[change_set.change_type](change_set)) + + for task_bucket in task_buckets: + new_entities = [ + entity + for entity in await asyncio.gather(*task_bucket) + if entity is not None + ] + if new_entities: + await entity_component.async_add_entities(new_entities) - collection.async_add_listener(_collection_changed) + collection.async_add_change_set_listener(_collection_changed) class StorageCollectionWebsocket: From 35ed1c203cd4bb3bfa7e56ff9e9df9b153b1680e Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Thu, 25 Mar 2021 22:46:17 -1000 Subject: [PATCH 2/3] scope --- homeassistant/helpers/collection.py | 42 ++++++++++++++--------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/homeassistant/helpers/collection.py b/homeassistant/helpers/collection.py index fac0fe7f7a687c..789035baa561bb 100644 --- a/homeassistant/helpers/collection.py +++ b/homeassistant/helpers/collection.py @@ -329,31 +329,31 @@ def sync_entity_lifecycle( entities = {} ent_reg = entity_registry.async_get(hass) - async def _collection_changed(change_sets: Iterable[CollectionChangeSet]) -> None: - """Handle a collection change.""" + async def _add_entity(change_set: CollectionChangeSet) -> Entity: + entities[change_set.item_id] = create_entity(change_set.item) + return entities[change_set.item_id] - async def _add_entity(change_set: CollectionChangeSet) -> Entity: - entities[change_set.item_id] = create_entity(change_set.item) - return entities[change_set.item_id] + async def _remove_entity(change_set: CollectionChangeSet) -> None: + ent_to_remove = ent_reg.async_get_entity_id( + domain, platform, change_set.item_id + ) + if ent_to_remove is not None: + ent_reg.async_remove(ent_to_remove) + else: + await entities[change_set.item_id].async_remove(force_remove=True) + entities.pop(change_set.item_id) - async def _remove_entity(change_set: CollectionChangeSet) -> None: - ent_to_remove = ent_reg.async_get_entity_id( - domain, platform, change_set.item_id - ) - if ent_to_remove is not None: - ent_reg.async_remove(ent_to_remove) - else: - await entities[change_set.item_id].async_remove(force_remove=True) - entities.pop(change_set.item_id) + async def _update_entity(change_set: CollectionChangeSet) -> None: + await entities[change_set.item_id].async_update_config(change_set.item) # type: ignore - async def _update_entity(change_set: CollectionChangeSet) -> None: - await entities[change_set.item_id].async_update_config(change_set.item) # type: ignore + _func_map = { + CHANGE_ADDED: _add_entity, + CHANGE_REMOVED: _remove_entity, + CHANGE_UPDATED: _update_entity, + } - _func_map = { - CHANGE_ADDED: _add_entity, - CHANGE_REMOVED: _remove_entity, - CHANGE_UPDATED: _update_entity, - } + async def _collection_changed(change_sets: Iterable[CollectionChangeSet]) -> None: + """Handle a collection change.""" # Create a new bucket every time we have a different change type # to ensure operations happen in order. We only group # the same change type. From d8afa7b2a56c2c6bbfcab67558be465d16e56bce Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sat, 3 Apr 2021 21:32:09 -1000 Subject: [PATCH 3/3] use groupby --- homeassistant/helpers/collection.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/homeassistant/helpers/collection.py b/homeassistant/helpers/collection.py index 789035baa561bb..998258649de0d5 100644 --- a/homeassistant/helpers/collection.py +++ b/homeassistant/helpers/collection.py @@ -4,8 +4,9 @@ from abc import ABC, abstractmethod import asyncio from dataclasses import dataclass +from itertools import groupby import logging -from typing import Any, Awaitable, Callable, Iterable, Optional, cast +from typing import Any, Awaitable, Callable, Coroutine, Iterable, Optional, cast import voluptuous as vol from voluptuous.humanize import humanize_error @@ -346,7 +347,9 @@ async def _remove_entity(change_set: CollectionChangeSet) -> None: async def _update_entity(change_set: CollectionChangeSet) -> None: await entities[change_set.item_id].async_update_config(change_set.item) # type: ignore - _func_map = { + _func_map: dict[ + str, Callable[[CollectionChangeSet], Coroutine[Any, Any, Entity | None]] + ] = { CHANGE_ADDED: _add_entity, CHANGE_REMOVED: _remove_entity, CHANGE_UPDATED: _update_entity, @@ -357,17 +360,17 @@ async def _collection_changed(change_sets: Iterable[CollectionChangeSet]) -> Non # Create a new bucket every time we have a different change type # to ensure operations happen in order. We only group # the same change type. - task_buckets: list = [] - previous_change_type = None - for change_set in change_sets: - if previous_change_type != change_set.change_type: - task_buckets.append([]) - task_buckets[-1].append(_func_map[change_set.change_type](change_set)) - - for task_bucket in task_buckets: + for _, grouped in groupby( + change_sets, lambda change_set: change_set.change_type + ): new_entities = [ entity - for entity in await asyncio.gather(*task_bucket) + for entity in await asyncio.gather( + *[ + _func_map[change_set.change_type](change_set) + for change_set in grouped + ] + ) if entity is not None ] if new_entities: