Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ omit =
homeassistant/components/raspyrfm/*
homeassistant/components/recollect_waste/__init__.py
homeassistant/components/recollect_waste/sensor.py
homeassistant/components/recorder/repack.py
homeassistant/components/recswitch/switch.py
homeassistant/components/reddit/*
homeassistant/components/rejseplanen/sensor.py
Expand Down
3 changes: 3 additions & 0 deletions homeassistant/components/recorder/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
DOMAIN = "recorder"

CONF_DB_INTEGRITY_CHECK = "db_integrity_check"

# The maximum number of rows (events) we purge in one delete statement
MAX_ROWS_TO_PURGE = 1000
2 changes: 1 addition & 1 deletion homeassistant/components/recorder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class States(Base): # type: ignore
last_updated = Column(DateTime(timezone=True), default=dt_util.utcnow, index=True)
created = Column(DateTime(timezone=True), default=dt_util.utcnow)
old_state_id = Column(
Integer, ForeignKey("states.state_id", ondelete="SET NULL"), index=True
Integer, ForeignKey("states.state_id", ondelete="NO ACTION"), index=True
Comment thread
bdraco marked this conversation as resolved.
)
event = relationship("Events", uselist=False)
old_state = relationship("States", remote_side=[state_id])
Expand Down
162 changes: 100 additions & 62 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,51 @@
"""Purge old data helper."""
from datetime import timedelta
from __future__ import annotations

from datetime import datetime, timedelta
import logging
import time
from typing import TYPE_CHECKING

from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.session import Session

import homeassistant.util.dt as dt_util

from .const import MAX_ROWS_TO_PURGE
from .models import Events, RecorderRuns, States
from .util import execute, session_scope
from .repack import repack_database
from .util import session_scope

if TYPE_CHECKING:
from . import Recorder

_LOGGER = logging.getLogger(__name__)


def purge_old_data(instance, purge_days: int, repack: bool) -> bool:
def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
"""Purge events and states older than purge_days ago.

Cleans up an timeframe of an hour, based on the oldest record.
"""
purge_before = dt_util.utcnow() - timedelta(days=purge_days)
Comment thread
cdce8p marked this conversation as resolved.
_LOGGER.debug("Purging states and events before target %s", purge_before)

try:
with session_scope(session=instance.get_session()) as session:
# Purge a max of 1 hour, based on the oldest states or events record
batch_purge_before = purge_before

query = session.query(States).order_by(States.last_updated.asc()).limit(1)
states = execute(query, to_native=True, validate_entity_ids=False)
if states:
batch_purge_before = min(
batch_purge_before,
states[0].last_updated + timedelta(hours=1),
)

query = session.query(Events).order_by(Events.time_fired.asc()).limit(1)
events = execute(query, to_native=True)
if events:
batch_purge_before = min(
batch_purge_before,
events[0].time_fired + timedelta(hours=1),
)

_LOGGER.debug("Purging states and events before %s", batch_purge_before)

deleted_rows = (
session.query(States)
.filter(States.last_updated < batch_purge_before)
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s states", deleted_rows)

deleted_rows = (
session.query(Events)
.filter(Events.time_fired < batch_purge_before)
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s events", deleted_rows)

# If states or events purging isn't processing the purge_before yet,
# return false, as we are not done yet.
if batch_purge_before != purge_before:
with session_scope(session=instance.get_session()) as session: # type: ignore
Comment thread
cdce8p marked this conversation as resolved.
# Purge a max of MAX_ROWS_TO_PURGE, based on the oldest states or events record
event_ids = _select_event_ids_to_purge(session, purge_before)
state_ids = _select_state_ids_to_purge(session, purge_before, event_ids)
if state_ids:
_disconnect_states_about_to_be_purged(session, state_ids)
_purge_state_ids(session, state_ids)
if event_ids:
_purge_event_ids(session, event_ids)
# If states or events purging isn't processing the purge_before yet,
# return false, as we are not done yet.
_LOGGER.debug("Purging hasn't fully completed yet")
return False

# Recorder runs is small, no need to batch run it
deleted_rows = (
session.query(RecorderRuns)
.filter(RecorderRuns.start < purge_before)
.filter(RecorderRuns.run_id != instance.run_info.run_id)
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows)

_purge_old_recorder_runs(instance, session, purge_before)
if repack:
# Execute sqlite or postgresql vacuum command to free up space on disk
if instance.engine.driver in ("pysqlite", "postgresql"):
_LOGGER.debug("Vacuuming SQL DB to free space")
instance.engine.execute("VACUUM")
# Optimize mysql / mariadb tables to free up space on disk
elif instance.engine.driver in ("mysqldb", "pymysql"):
_LOGGER.debug("Optimizing SQL DB to free space")
instance.engine.execute("OPTIMIZE TABLE states, events, recorder_runs")

repack_database(instance)
except OperationalError as err:
# Retry when one of the following MySQL errors occurred:
# 1205: Lock wait timeout exceeded; try restarting transaction
Expand All @@ -101,3 +64,78 @@ def purge_old_data(instance, purge_days: int, repack: bool) -> bool:
except SQLAlchemyError as err:
_LOGGER.warning("Error purging history: %s", err)
return True


def _select_event_ids_to_purge(session: Session, purge_before: datetime) -> list:
"""Return a list of event ids to purge."""
events = (
session.query(Events.event_id)
.filter(Events.time_fired < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
_LOGGER.debug("Selected %s event ids to remove", len(events))
return [event.event_id for event in events]


def _select_state_ids_to_purge(
session: Session, purge_before: datetime, event_ids: list
) -> list:
"""Return a list of state ids to purge."""
if not event_ids:
return []
states = (
session.query(States.state_id)
.filter(States.last_updated < purge_before)
.filter(States.event_id.in_(event_ids))
.all()
)
_LOGGER.debug("Selected %s state ids to remove", len(states))
return [state.state_id for state in states]


def _disconnect_states_about_to_be_purged(session: Session, state_ids: list) -> None:
Comment thread
cdce8p marked this conversation as resolved.
# Update old_state_id to NULL before deleting to ensure
# the delete does not fail due to a foreign key constraint
# since some databases (MSSQL) cannot do the ON DELETE SET NULL
# for us.
disconnected_rows = (
session.query(States)
.filter(States.old_state_id.in_(state_ids))
Comment thread
bdraco marked this conversation as resolved.
.update({"old_state_id": None}, synchronize_session=False)
)
_LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows)


def _purge_state_ids(session: Session, state_ids: list) -> None:
"""Delete by state id."""
deleted_rows = (
session.query(States)
.filter(States.state_id.in_(state_ids))
Comment thread
bdraco marked this conversation as resolved.
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s states", deleted_rows)


def _purge_event_ids(session: Session, event_ids: list) -> None:
"""Delete by event id."""
deleted_rows = (
session.query(Events)
.filter(Events.event_id.in_(event_ids))
Comment thread
bdraco marked this conversation as resolved.
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s events", deleted_rows)


def _purge_old_recorder_runs(
instance: Recorder, session: Session, purge_before: datetime
) -> None:
"""Purge all old recorder runs."""
# Recorder runs is small, no need to batch run it
deleted_rows = (
session.query(RecorderRuns)
.filter(RecorderRuns.start < purge_before)
.filter(RecorderRuns.run_id != instance.run_info.run_id)
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows)
35 changes: 35 additions & 0 deletions homeassistant/components/recorder/repack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Purge repack helper."""
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from . import Recorder

_LOGGER = logging.getLogger(__name__)


def repack_database(instance: Recorder) -> None:
"""Repack based on engine type."""

# Execute sqlite command to free up space on disk
if instance.engine.dialect.name == "sqlite":
_LOGGER.debug("Vacuuming SQL DB to free space")
instance.engine.execute("VACUUM")
return

# Execute postgresql vacuum command to free up space on disk
if instance.engine.dialect.name == "postgresql":
_LOGGER.debug("Vacuuming SQL DB to free space")
with instance.engine.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as conn:
conn.execute("VACUUM")
return

# Optimize mysql / mariadb tables to free up space on disk
if instance.engine.dialect.name == "mysql":
_LOGGER.debug("Optimizing SQL DB to free space")
instance.engine.execute("OPTIMIZE TABLE states, events, recorder_runs")
return
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ warn_redundant_casts = true
warn_unused_configs = true


[mypy-homeassistant.block_async_io,homeassistant.bootstrap,homeassistant.components,homeassistant.config_entries,homeassistant.config,homeassistant.const,homeassistant.core,homeassistant.data_entry_flow,homeassistant.exceptions,homeassistant.__init__,homeassistant.loader,homeassistant.__main__,homeassistant.requirements,homeassistant.runner,homeassistant.setup,homeassistant.util,homeassistant.auth.*,homeassistant.components.automation.*,homeassistant.components.binary_sensor.*,homeassistant.components.bond.*,homeassistant.components.calendar.*,homeassistant.components.cover.*,homeassistant.components.device_automation.*,homeassistant.components.frontend.*,homeassistant.components.geo_location.*,homeassistant.components.group.*,homeassistant.components.history.*,homeassistant.components.http.*,homeassistant.components.huawei_lte.*,homeassistant.components.hyperion.*,homeassistant.components.image_processing.*,homeassistant.components.integration.*,homeassistant.components.light.*,homeassistant.components.lock.*,homeassistant.components.mailbox.*,homeassistant.components.media_player.*,homeassistant.components.notify.*,homeassistant.components.number.*,homeassistant.components.persistent_notification.*,homeassistant.components.proximity.*,homeassistant.components.remote.*,homeassistant.components.scene.*,homeassistant.components.sensor.*,homeassistant.components.slack.*,homeassistant.components.sun.*,homeassistant.components.switch.*,homeassistant.components.systemmonitor.*,homeassistant.components.tts.*,homeassistant.components.vacuum.*,homeassistant.components.water_heater.*,homeassistant.components.weather.*,homeassistant.components.websocket_api.*,homeassistant.components.zone.*,homeassistant.components.zwave_js.*,homeassistant.helpers.*,homeassistant.scripts.*,homeassistant.util.*,tests.components.hyperion.*]
[mypy-homeassistant.block_async_io,homeassistant.bootstrap,homeassistant.components,homeassistant.config_entries,homeassistant.config,homeassistant.const,homeassistant.core,homeassistant.data_entry_flow,homeassistant.exceptions,homeassistant.__init__,homeassistant.loader,homeassistant.__main__,homeassistant.requirements,homeassistant.runner,homeassistant.setup,homeassistant.util,homeassistant.auth.*,homeassistant.components.automation.*,homeassistant.components.binary_sensor.*,homeassistant.components.bond.*,homeassistant.components.calendar.*,homeassistant.components.cover.*,homeassistant.components.device_automation.*,homeassistant.components.frontend.*,homeassistant.components.geo_location.*,homeassistant.components.group.*,homeassistant.components.history.*,homeassistant.components.http.*,homeassistant.components.huawei_lte.*,homeassistant.components.hyperion.*,homeassistant.components.image_processing.*,homeassistant.components.integration.*,homeassistant.components.light.*,homeassistant.components.lock.*,homeassistant.components.mailbox.*,homeassistant.components.media_player.*,homeassistant.components.notify.*,homeassistant.components.number.*,homeassistant.components.persistent_notification.*,homeassistant.components.proximity.*,homeassistant.components.recorder.purge,homeassistant.components.recorder.repack,homeassistant.components.remote.*,homeassistant.components.scene.*,homeassistant.components.sensor.*,homeassistant.components.slack.*,homeassistant.components.sun.*,homeassistant.components.switch.*,homeassistant.components.systemmonitor.*,homeassistant.components.tts.*,homeassistant.components.vacuum.*,homeassistant.components.water_heater.*,homeassistant.components.weather.*,homeassistant.components.websocket_api.*,homeassistant.components.zone.*,homeassistant.components.zwave_js.*,homeassistant.helpers.*,homeassistant.scripts.*,homeassistant.util.*,tests.components.hyperion.*]
strict = true
ignore_errors = false
warn_unreachable = true
Expand Down
Loading