Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
bf7e984
Add streaming logbook websocket endpoint
bdraco May 21, 2022
59548c1
slightly faster - but still very tiny on the profile anyways
bdraco May 21, 2022
ce973c8
try LRU
bdraco May 21, 2022
64bdee5
move dep
bdraco May 21, 2022
5cadbf2
Revert "try LRU"
bdraco May 21, 2022
ecf1435
Revert "move dep"
bdraco May 21, 2022
f411cec
tweak for performance
bdraco May 21, 2022
e02e07f
tweak
bdraco May 21, 2022
d4f6207
hold it the other way
bdraco May 21, 2022
2bc9c05
fix missing recorder mock
bdraco May 21, 2022
05b04c1
reduce
bdraco May 21, 2022
0d4a9c6
cleanup logic
bdraco May 21, 2022
c6bca0f
ensure removals are not prop
bdraco May 21, 2022
5edeb5b
simplify
bdraco May 21, 2022
29b18ab
simplify
bdraco May 21, 2022
62caddc
simplify
bdraco May 21, 2022
11bc452
simplify
bdraco May 21, 2022
207505c
fix
bdraco May 21, 2022
77a25ff
reduce
bdraco May 21, 2022
e1ca7e1
reduce
bdraco May 21, 2022
fd69fac
reduce
bdraco May 21, 2022
4a69ac6
reduce
bdraco May 21, 2022
5563b5c
reduce
bdraco May 21, 2022
195f5c1
pre filter by uom/state class, and last_changed != last_updated when …
bdraco May 21, 2022
fe6287c
fix filtering of events
bdraco May 21, 2022
8c0da9d
handle queue full
bdraco May 21, 2022
970073a
fix comparison
bdraco May 21, 2022
bd35636
tweaks to make mypy happy
bdraco May 21, 2022
dc616a6
tests for devices
bdraco May 21, 2022
b829cdf
always reply even if there are no results so the frontend does not lo…
bdraco May 21, 2022
06d5f9c
add live to history sync test
bdraco May 21, 2022
05f76ae
disconnected test
bdraco May 21, 2022
c9404ae
guard against cancelation during setup
bdraco May 21, 2022
b7d82a3
add coverage for overload/failure of consumer case
bdraco May 21, 2022
5d1f1b0
add coverage for overload/failure of consumer case
bdraco May 21, 2022
cd089fb
no post filtering needed, always get a row now
bdraco May 21, 2022
53d180f
touch ups
bdraco May 21, 2022
1371122
touch ups
bdraco May 21, 2022
0c1ef0d
touch ups
bdraco May 21, 2022
9ff2561
touch ups
bdraco May 21, 2022
8d172ee
handle event storms better
bdraco May 21, 2022
c281ad1
apply target directly when there is no filter
bdraco May 22, 2022
1ddd36d
Update homeassistant/components/logbook/websocket_api.py
bdraco May 22, 2022
422976c
Update homeassistant/components/logbook/websocket_api.py
bdraco May 22, 2022
8601369
Update homeassistant/components/logbook/websocket_api.py
bdraco May 22, 2022
39f7f0a
drop task done, add guard
bdraco May 22, 2022
a3019b5
remove comment since its not actually possible
bdraco May 22, 2022
05a7285
avoid exposing the commit interval details to logbook
bdraco May 22, 2022
3ab008e
handle the case where the task is canceled before streaming starts
bdraco May 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
732 changes: 20 additions & 712 deletions homeassistant/components/logbook/__init__.py

Large diffs are not rendered by default.

39 changes: 39 additions & 0 deletions homeassistant/components/logbook/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Event parser and human readable log generator."""
from __future__ import annotations

from homeassistant.components.automation import EVENT_AUTOMATION_TRIGGERED
from homeassistant.components.script import EVENT_SCRIPT_STARTED
from homeassistant.const import EVENT_CALL_SERVICE, EVENT_LOGBOOK_ENTRY

ATTR_MESSAGE = "message"

DOMAIN = "logbook"

CONTEXT_USER_ID = "context_user_id"
CONTEXT_ENTITY_ID = "context_entity_id"
CONTEXT_ENTITY_ID_NAME = "context_entity_id_name"
CONTEXT_EVENT_TYPE = "context_event_type"
CONTEXT_DOMAIN = "context_domain"
CONTEXT_STATE = "context_state"
CONTEXT_SERVICE = "context_service"
CONTEXT_NAME = "context_name"
CONTEXT_MESSAGE = "context_message"

LOGBOOK_ENTRY_DOMAIN = "domain"
LOGBOOK_ENTRY_ENTITY_ID = "entity_id"
LOGBOOK_ENTRY_ICON = "icon"
LOGBOOK_ENTRY_MESSAGE = "message"
LOGBOOK_ENTRY_NAME = "name"
LOGBOOK_ENTRY_STATE = "state"
LOGBOOK_ENTRY_WHEN = "when"

ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED = {EVENT_LOGBOOK_ENTRY, EVENT_CALL_SERVICE}
ENTITY_EVENTS_WITHOUT_CONFIG_ENTRY = {
EVENT_LOGBOOK_ENTRY,
EVENT_AUTOMATION_TRIGGERED,
EVENT_SCRIPT_STARTED,
}


LOGBOOK_FILTERS = "logbook_filters"
LOGBOOK_ENTITIES_FILTER = "entities_filter"
193 changes: 193 additions & 0 deletions homeassistant/components/logbook/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
"""Event parser and human readable log generator."""
from __future__ import annotations

from collections.abc import Callable
from typing import Any

from homeassistant.components.sensor import ATTR_STATE_CLASS
from homeassistant.const import (
ATTR_DEVICE_ID,
ATTR_ENTITY_ID,
ATTR_UNIT_OF_MEASUREMENT,
EVENT_LOGBOOK_ENTRY,
EVENT_STATE_CHANGED,
)
from homeassistant.core import (
CALLBACK_TYPE,
Event,
HomeAssistant,
State,
callback,
is_callback,
)
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.event import async_track_state_change_event

from .const import (
ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED,
DOMAIN,
ENTITY_EVENTS_WITHOUT_CONFIG_ENTRY,
)
from .models import LazyEventPartialState


def async_filter_entities(hass: HomeAssistant, entity_ids: list[str]) -> list[str]:
"""Filter out any entities that logbook will not produce results for."""
ent_reg = er.async_get(hass)
return [
entity_id
for entity_id in entity_ids
if not _is_entity_id_filtered(hass, ent_reg, entity_id)
]


def async_determine_event_types(
hass: HomeAssistant, entity_ids: list[str] | None, device_ids: list[str] | None
) -> tuple[str, ...]:
"""Reduce the event types based on the entity ids and device ids."""
external_events: dict[
str, tuple[str, Callable[[LazyEventPartialState], dict[str, Any]]]
] = hass.data.get(DOMAIN, {})
if not entity_ids and not device_ids:
return (*ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED, *external_events)
config_entry_ids: set[str] = set()
intrested_event_types: set[str] = set()

if entity_ids:
#
# Home Assistant doesn't allow firing events from
# entities so we have a limited list to check
#
# automations and scripts can refer to entities
# but they do not have a config entry so we need
# to add them.
#
# We also allow entity_ids to be recorded via
# manual logbook entries.
#
intrested_event_types |= ENTITY_EVENTS_WITHOUT_CONFIG_ENTRY

if device_ids:
dev_reg = dr.async_get(hass)
for device_id in device_ids:
if (device := dev_reg.async_get(device_id)) and device.config_entries:
config_entry_ids |= device.config_entries
interested_domains: set[str] = set()
for entry_id in config_entry_ids:
if entry := hass.config_entries.async_get_entry(entry_id):
interested_domains.add(entry.domain)
for external_event, domain_call in external_events.items():
if domain_call[0] in interested_domains:
intrested_event_types.add(external_event)

return tuple(
event_type
for event_type in (EVENT_LOGBOOK_ENTRY, *external_events)
if event_type in intrested_event_types
)


@callback
def async_subscribe_events(
hass: HomeAssistant,
subscriptions: list[CALLBACK_TYPE],
target: Callable[[Event], None],
event_types: tuple[str, ...],
entity_ids: list[str] | None,
device_ids: list[str] | None,
) -> None:
"""Subscribe to events for the entities and devices or all.

These are the events we need to listen for to do
the live logbook stream.
"""
ent_reg = er.async_get(hass)
assert is_callback(target), "target must be a callback"
event_forwarder = target

if entity_ids or device_ids:
entity_ids_set = set(entity_ids) if entity_ids else set()
device_ids_set = set(device_ids) if device_ids else set()

@callback
def _forward_events_filtered(event: Event) -> None:
event_data = event.data
if (
entity_ids_set and event_data.get(ATTR_ENTITY_ID) in entity_ids_set
) or (device_ids_set and event_data.get(ATTR_DEVICE_ID) in device_ids_set):
target(event)

event_forwarder = _forward_events_filtered

for event_type in event_types:
subscriptions.append(
hass.bus.async_listen(event_type, event_forwarder, run_immediately=True)
)

@callback
def _forward_state_events_filtered(event: Event) -> None:
if event.data.get("old_state") is None or event.data.get("new_state") is None:
return
state: State = event.data["new_state"]
if not _is_state_filtered(ent_reg, state):
target(event)

if entity_ids:
subscriptions.append(
async_track_state_change_event(
hass, entity_ids, _forward_state_events_filtered
)
)
return

# We want the firehose
subscriptions.append(
hass.bus.async_listen(
EVENT_STATE_CHANGED,
_forward_state_events_filtered,
run_immediately=True,
)
)


def is_sensor_continuous(ent_reg: er.EntityRegistry, entity_id: str) -> bool:
"""Determine if a sensor is continuous by checking its state class.

Sensors with a unit_of_measurement are also considered continuous, but are filtered
already by the SQL query generated by _get_events
"""
if not (entry := ent_reg.async_get(entity_id)):
# Entity not registered, so can't have a state class
return False
return (
entry.capabilities is not None
and entry.capabilities.get(ATTR_STATE_CLASS) is not None
)


def _is_state_filtered(ent_reg: er.EntityRegistry, state: State) -> bool:
"""Check if the logbook should filter a state.

Used when we are in live mode to ensure
we only get significant changes (state.last_changed != state.last_updated)
"""
return bool(
state.last_changed != state.last_updated
or ATTR_UNIT_OF_MEASUREMENT in state.attributes
or is_sensor_continuous(ent_reg, state.entity_id)
)


def _is_entity_id_filtered(
hass: HomeAssistant, ent_reg: er.EntityRegistry, entity_id: str
) -> bool:
"""Check if the logbook should filter an entity.

Used to setup listeners and which entities to select
from the database when a list of entities is requested.
"""
return bool(
(state := hass.states.get(entity_id))
and (ATTR_UNIT_OF_MEASUREMENT in state.attributes)
or is_sensor_continuous(ent_reg, entity_id)
)
113 changes: 113 additions & 0 deletions homeassistant/components/logbook/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Event parser and human readable log generator."""
from __future__ import annotations

from dataclasses import dataclass
from datetime import datetime as dt
import json
from typing import Any, cast

from sqlalchemy.engine.row import Row

from homeassistant.const import ATTR_ICON, EVENT_STATE_CHANGED
from homeassistant.core import Context, Event, State, callback


class LazyEventPartialState:
"""A lazy version of core Event with limited State joined in."""

__slots__ = [
"row",
"_event_data",
"_event_data_cache",
"event_type",
"entity_id",
"state",
"context_id",
"context_user_id",
"context_parent_id",
"data",
]

def __init__(
self,
row: Row | EventAsRow,
event_data_cache: dict[str, dict[str, Any]],
) -> None:
"""Init the lazy event."""
self.row = row
self._event_data: dict[str, Any] | None = None
self._event_data_cache = event_data_cache
self.event_type: str | None = self.row.event_type
self.entity_id: str | None = self.row.entity_id
self.state = self.row.state
self.context_id: str | None = self.row.context_id
self.context_user_id: str | None = self.row.context_user_id
self.context_parent_id: str | None = self.row.context_parent_id
if data := getattr(row, "data", None):
# If its an EventAsRow we can avoid the whole
# json decode process as we already have the data
self.data = data
return
source = cast(str, self.row.shared_data or self.row.event_data)
if not source:
self.data = {}
elif event_data := self._event_data_cache.get(source):
self.data = event_data
else:
self.data = self._event_data_cache[source] = cast(
dict[str, Any], json.loads(source)
)


@dataclass
class EventAsRow:
"""Convert an event to a row."""

data: dict[str, Any]
context: Context
context_id: str
time_fired: dt
state_id: int
event_data: str | None = None
old_format_icon: None = None
event_id: None = None
entity_id: str | None = None
icon: str | None = None
context_user_id: str | None = None
context_parent_id: str | None = None
event_type: str | None = None
state: str | None = None
shared_data: str | None = None
context_only: None = None


@callback
def async_event_to_row(event: Event) -> EventAsRow | None:
"""Convert an event to a row."""
if event.event_type != EVENT_STATE_CHANGED:
return EventAsRow(
data=event.data,
context=event.context,
event_type=event.event_type,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
time_fired=event.time_fired,
state_id=hash(event),
)
# States are prefiltered so we never get states
# that are missing new_state or old_state
# since the logbook does not show these
new_state: State = event.data["new_state"]
return EventAsRow(
data=event.data,
context=event.context,
entity_id=new_state.entity_id,
state=new_state.state,
context_id=new_state.context.id,
context_user_id=new_state.context.user_id,
context_parent_id=new_state.context.parent_id,
time_fired=new_state.last_updated,
state_id=hash(event),
icon=new_state.attributes.get(ATTR_ICON),
)
Loading