Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions homeassistant/components/recorder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""Support for recording details."""
import asyncio
from collections import namedtuple
import concurrent.futures
from datetime import datetime
import logging
import queue
import sqlite3
import threading
import time
from typing import Any, Callable, List, Optional
from typing import Any, Callable, List, NamedTuple, Optional

from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select
from sqlalchemy.orm import scoped_session, sessionmaker
Expand Down Expand Up @@ -223,7 +222,11 @@ async def async_handle_disable_service(service):
return await instance.async_db_ready


PurgeTask = namedtuple("PurgeTask", ["keep_days", "repack"])
class PurgeTask(NamedTuple):
"""Object to store information about purge task."""

keep_days: int
repack: bool


class WaitTask:
Expand Down
39 changes: 39 additions & 0 deletions homeassistant/components/recorder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ class Events(Base): # type: ignore
Index("ix_events_event_type_time_fired", "event_type", "time_fired"),
)

def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.Events("
f"id={self.event_id}, type='{self.event_type}', data='{self.event_data}', "
f"origin='{self.origin}', time_fired='{self.time_fired}'"
f")>"
)

@staticmethod
def from_event(event, event_data=None):
"""Create an event database object from a native event."""
Expand Down Expand Up @@ -128,6 +137,17 @@ class States(Base): # type: ignore
Index("ix_states_entity_id_last_updated", "entity_id", "last_updated"),
)

def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.States("
f"id={self.state_id}, domain='{self.domain}', entity_id='{self.entity_id}', "
f"state='{self.state}', event_id='{self.event_id}', "
f"last_updated='{self.last_updated.isoformat(sep=' ', timespec='seconds')}', "
f"old_state_id={self.old_state_id}"
f")>"
)

@staticmethod
def from_event(event):
"""Create object from a state_changed event."""
Expand Down Expand Up @@ -184,6 +204,16 @@ class RecorderRuns(Base): # type: ignore

__table_args__ = (Index("ix_recorder_runs_start_end", "start", "end"),)

def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.RecorderRuns("
f"id={self.run_id}, start='{self.start.isoformat(sep='', timespec='seconds')}', "
f"end='{self.end.isoformat(sep='', timespec='seconds')}', closed_incorrect={self.closed_incorrect}, "
f"created='{self.created.isoformat(sep='', timespec='seconds')}'"
f")>"
)

def entity_ids(self, point_in_time=None):
"""Return the entity ids that existed in this run.

Expand Down Expand Up @@ -218,6 +248,15 @@ class SchemaChanges(Base): # type: ignore
schema_version = Column(Integer)
changed = Column(DateTime(timezone=True), default=dt_util.utcnow)

def __repr__(self) -> str:
"""Return string representation of instance for debugging."""
return (
f"<recorder.SchemaChanges("
f"id={self.change_id}, schema_version={self.schema_version}, "
f"changed='{self.changed.isoformat(sep=' ', timespec='seconds')}'"
f")>"
)


def process_timestamp(ts):
"""Process a timestamp into datetime object."""
Expand Down
21 changes: 11 additions & 10 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,16 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
Cleans up an timeframe of an hour, based on the oldest record.
"""
purge_before = dt_util.utcnow() - timedelta(days=purge_days)
_LOGGER.debug("Purging states and events before target %s", purge_before)
_LOGGER.debug(
"Purging states and events before target %s",
purge_before.isoformat(sep=" ", timespec="seconds"),
)
try:
with session_scope(session=instance.get_session()) as session: # type: ignore
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
event_ids = _select_event_ids_to_purge(session, purge_before)
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
if state_ids:
_disconnect_states_about_to_be_purged(session, state_ids)
_purge_state_ids(session, state_ids)
if event_ids:
_purge_event_ids(session, event_ids)
Expand Down Expand Up @@ -66,7 +68,7 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
return True


def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list:
def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list[int]:
"""Return a list of event ids to purge."""
events = (
session.query(Events.event_id)
Expand All @@ -79,8 +81,8 @@ def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list


def _select_state_ids_to_purge(
session: Session, purge_before: datetime, event_ids: list
) -> list:
session: Session, purge_before: datetime, event_ids: list[int]
) -> list[int]:
"""Return a list of state ids to purge."""
if not event_ids:
return []
Expand All @@ -94,7 +96,9 @@ def _select_state_ids_to_purge(
return [state.state_id for state in states]


def _disconnect_states_about_to_be_purged(session: Session, state_ids: list) -> None:
def _purge_state_ids(session: Session, state_ids: list[int]) -> None:
"""Disconnect states and delete by state id."""

# Update old_state_id to NULL before deleting to ensure
# the delete does not fail due to a foreign key constraint
# since some databases (MSSQL) cannot do the ON DELETE SET NULL
Expand All @@ -106,9 +110,6 @@ def _disconnect_states_about_to_be_purged(session: Session, state_ids: list) ->
)
_LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows)


def _purge_state_ids(session: Session, state_ids: list) -> None:
"""Delete by state id."""
deleted_rows = (
session.query(States)
.filter(States.state_id.in_(state_ids))
Expand All @@ -117,7 +118,7 @@ def _purge_state_ids(session: Session, state_ids: list) -> None:
_LOGGER.debug("Deleted %s states", deleted_rows)


def _purge_event_ids(session: Session, event_ids: list) -> None:
def _purge_event_ids(session: Session, event_ids: list[int]) -> None:
"""Delete by event id."""
deleted_rows = (
session.query(Events)
Expand Down
9 changes: 8 additions & 1 deletion homeassistant/components/recorder/util.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
"""SQLAlchemy util functions."""
from __future__ import annotations

from collections.abc import Generator
from contextlib import contextmanager
from datetime import timedelta
import logging
import os
import time

from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.session import Session

from homeassistant.helpers.typing import HomeAssistantType
import homeassistant.util.dt as dt_util

from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, SQLITE_URL_PREFIX
Expand All @@ -25,7 +30,9 @@


@contextmanager
def session_scope(*, hass=None, session=None):
def session_scope(
*, hass: HomeAssistantType | None = None, session: Session | None = None
) -> Generator[Session, None, None]:
"""Provide a transactional scope around a series of operations."""
if session is None and hass is not None:
session = hass.data[DATA_INSTANCE].get_session()
Expand Down
38 changes: 33 additions & 5 deletions tests/components/recorder/common.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""Common test utils for working with recorder."""

from datetime import timedelta

from homeassistant import core as ha
from homeassistant.components import recorder
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.util import dt as dt_util

from tests.common import fire_time_changed
from tests.common import async_fire_time_changed, fire_time_changed


def wait_recording_done(hass):
def wait_recording_done(hass: HomeAssistantType) -> None:
"""Block till recording is done."""
hass.block_till_done()
trigger_db_commit(hass)
Expand All @@ -17,18 +18,45 @@ def wait_recording_done(hass):
hass.block_till_done()


async def async_wait_recording_done(hass):
async def async_wait_recording_done_without_instance(hass: HomeAssistantType) -> None:
"""Block till recording is done."""
await hass.loop.run_in_executor(None, wait_recording_done, hass)


def trigger_db_commit(hass):
def trigger_db_commit(hass: HomeAssistantType) -> None:
"""Force the recorder to commit."""
for _ in range(recorder.DEFAULT_COMMIT_INTERVAL):
# We only commit on time change
fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))


async def async_wait_recording_done(
hass: HomeAssistantType,
instance: recorder.Recorder,
) -> None:
"""Async wait until recording is done."""
await hass.async_block_till_done()
async_trigger_db_commit(hass)
await hass.async_block_till_done()
await async_recorder_block_till_done(hass, instance)
await hass.async_block_till_done()


@ha.callback
def async_trigger_db_commit(hass: HomeAssistantType) -> None:
"""Fore the recorder to commit. Async friendly."""
for _ in range(recorder.DEFAULT_COMMIT_INTERVAL):
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=1))


async def async_recorder_block_till_done(
hass: HomeAssistantType,
instance: recorder.Recorder,
) -> None:
"""Non blocking version of recorder.block_till_done()."""
await hass.async_add_executor_job(instance.block_till_done)


def corrupt_db_file(test_db_file):
"""Corrupt an sqlite3 database file."""
with open(test_db_file, "w+") as fhandle:
Expand Down
36 changes: 35 additions & 1 deletion tests/components/recorder/conftest.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
"""Common test tools."""
from __future__ import annotations

from collections.abc import AsyncGenerator
from typing import Awaitable, Callable, cast

import pytest

from homeassistant.components.recorder import Recorder
from homeassistant.components.recorder.const import DATA_INSTANCE
from homeassistant.helpers.typing import ConfigType, HomeAssistantType

from .common import async_recorder_block_till_done

from tests.common import get_test_home_assistant, init_recorder_component
from tests.common import (
async_init_recorder_component,
get_test_home_assistant,
init_recorder_component,
)

SetupRecorderInstanceT = Callable[..., Awaitable[Recorder]]


@pytest.fixture
Expand All @@ -22,3 +36,23 @@ def setup_recorder(config=None):

yield setup_recorder
hass.stop()


@pytest.fixture
async def async_setup_recorder_instance() -> AsyncGenerator[
SetupRecorderInstanceT, None
]:
"""Yield callable to setup recorder instance."""

async def async_setup_recorder(
hass: HomeAssistantType, config: ConfigType | None = None
) -> Recorder:
"""Setup and return recorder instance.""" # noqa: D401
await async_init_recorder_component(hass, config)
await hass.async_block_till_done()
instance = cast(Recorder, hass.data[DATA_INSTANCE])
await async_recorder_block_till_done(hass, instance)
assert isinstance(instance, Recorder)
return instance

yield async_setup_recorder
27 changes: 18 additions & 9 deletions tests/components/recorder/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,17 @@
STATE_UNLOCKED,
)
from homeassistant.core import Context, CoreState, callback
from homeassistant.helpers.typing import HomeAssistantType
from homeassistant.setup import async_setup_component, setup_component
from homeassistant.util import dt as dt_util

from .common import async_wait_recording_done, corrupt_db_file, wait_recording_done
from .common import (
async_wait_recording_done,
async_wait_recording_done_without_instance,
corrupt_db_file,
wait_recording_done,
)
from .conftest import SetupRecorderInstanceT

from tests.common import (
async_init_recorder_component,
Expand Down Expand Up @@ -62,17 +69,19 @@ async def test_shutdown_before_startup_finishes(hass):
assert run_info.end is not None


def test_saving_state(hass, hass_recorder):
async def test_saving_state(
hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT
):
"""Test saving and restoring a state."""
hass = hass_recorder()
instance = await async_setup_recorder_instance(hass)

entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}

hass.states.set(entity_id, state, attributes)
hass.states.async_set(entity_id, state, attributes)

wait_recording_done(hass)
await async_wait_recording_done(hass, instance)

with session_scope(hass=hass) as session:
db_states = list(session.query(States))
Expand Down Expand Up @@ -690,23 +699,23 @@ def _create_tmpdir_for_test_db():

hass.states.async_set("test.lost", "on", {})

await async_wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)
await hass.async_add_executor_job(corrupt_db_file, test_db_file)
await async_wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)

# This state will not be recorded because
# the database corruption will be discovered
# and we will have to rollback to recover
hass.states.async_set("test.one", "off", {})
await async_wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)

assert "Unrecoverable sqlite3 database corruption detected" in caplog.text
assert "The system will rename the corrupt database file" in caplog.text
assert "Connected to recorder database" in caplog.text

# This state should go into the new database
hass.states.async_set("test.two", "on", {})
await async_wait_recording_done(hass)
await async_wait_recording_done_without_instance(hass)

def _get_last_state():
with session_scope(hass=hass) as session:
Expand Down
Loading