Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 42 additions & 81 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,32 @@
import logging
from typing import TYPE_CHECKING

from sqlalchemy import func
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct

from homeassistant.const import EVENT_STATE_CHANGED

from .const import MAX_ROWS_TO_PURGE, SupportedDialect
from .models import (
EventData,
Events,
RecorderRuns,
StateAttributes,
States,
StatisticsRuns,
StatisticsShortTerm,
from .models import Events, StateAttributes, States
from .queries import (
attributes_ids_exist_in_states,
attributes_ids_exist_in_states_sqlite,
data_ids_exist_in_events,
data_ids_exist_in_events_sqlite,
delete_event_data_rows,
delete_event_rows,
delete_recorder_runs_rows,
delete_states_attributes_rows,
delete_states_rows,
delete_statistics_runs_rows,
delete_statistics_short_term_rows,
disconnect_states_rows,
find_events_to_purge,
find_latest_statistics_runs_run_id,
find_short_term_statistics_to_purge,
find_states_to_purge,
find_statistics_runs_to_purge,
)
from .queries import attributes_ids_exist_in_states, data_ids_exist_in_events
from .repack import repack_database
from .util import retryable_database_job, session_scope

Expand Down Expand Up @@ -101,19 +110,9 @@ def _select_event_state_attributes_ids_data_ids_to_purge(
session: Session, purge_before: datetime
) -> tuple[set[int], set[int], set[int], set[int]]:
"""Return a list of event, state, and attribute ids to purge."""
events = (
session.query(Events.event_id, Events.data_id)
.filter(Events.time_fired < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
events = session.execute(find_events_to_purge(purge_before)).all()
_LOGGER.debug("Selected %s event ids to remove", len(events))
states = (
session.query(States.state_id, States.attributes_id)
.filter(States.last_updated < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
states = session.execute(find_states_to_purge(purge_before)).all()
_LOGGER.debug("Selected %s state ids to remove", len(states))
event_ids = set()
state_ids = set()
Expand Down Expand Up @@ -152,9 +151,9 @@ def _select_unused_attributes_ids(
#
seen_ids = {
state[0]
for state in session.query(distinct(States.attributes_id))
.filter(States.attributes_id.in_(attributes_ids))
.all()
for state in session.execute(
attributes_ids_exist_in_states_sqlite(attributes_ids)
).all()
}
else:
#
Expand Down Expand Up @@ -210,9 +209,9 @@ def _select_unused_event_data_ids(
if using_sqlite:
seen_ids = {
state[0]
for state in session.query(distinct(Events.data_id))
.filter(Events.data_id.in_(data_ids))
.all()
for state in session.execute(
data_ids_exist_in_events_sqlite(data_ids)
).all()
}
else:
seen_ids = set()
Expand All @@ -234,16 +233,11 @@ def _select_statistics_runs_to_purge(
session: Session, purge_before: datetime
) -> list[int]:
"""Return a list of statistic runs to purge, but take care to keep the newest run."""
statistic_runs = (
session.query(StatisticsRuns.run_id)
.filter(StatisticsRuns.start < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
statistic_runs = session.execute(find_statistics_runs_to_purge(purge_before)).all()
statistic_runs_list = [run.run_id for run in statistic_runs]
# Exclude the newest statistics run
if (
last_run := session.query(func.max(StatisticsRuns.run_id)).scalar()
last_run := session.execute(find_latest_statistics_runs_run_id()).scalar()
) and last_run in statistic_runs_list:
statistic_runs_list.remove(last_run)

Expand All @@ -255,12 +249,9 @@ def _select_short_term_statistics_to_purge(
session: Session, purge_before: datetime
) -> list[int]:
"""Return a list of short term statistics to purge."""
statistics = (
session.query(StatisticsShortTerm.id)
.filter(StatisticsShortTerm.start < purge_before)
.limit(MAX_ROWS_TO_PURGE)
.all()
)
statistics = session.execute(
find_short_term_statistics_to_purge(purge_before)
).all()
_LOGGER.debug("Selected %s short term statistics to remove", len(statistics))
return [statistic.id for statistic in statistics]

Expand All @@ -272,18 +263,10 @@ def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int])
# the delete does not fail due to a foreign key constraint
# since some databases (MSSQL) cannot do the ON DELETE SET NULL
# for us.
disconnected_rows = (
session.query(States)
.filter(States.old_state_id.in_(state_ids))
.update({"old_state_id": None}, synchronize_session=False)
)
disconnected_rows = session.execute(disconnect_states_rows(state_ids))
_LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows)

deleted_rows = (
session.query(States)
.filter(States.state_id.in_(state_ids))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_states_rows(state_ids))
_LOGGER.debug("Deleted %s states", deleted_rows)

# Evict eny entries in the old_states cache referring to a purged state
Expand Down Expand Up @@ -348,12 +331,7 @@ def _purge_attributes_ids(
instance: Recorder, session: Session, attributes_ids: set[int]
) -> None:
"""Delete old attributes ids."""

deleted_rows = (
session.query(StateAttributes)
.filter(StateAttributes.attributes_id.in_(attributes_ids))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_states_attributes_rows(attributes_ids))
_LOGGER.debug("Deleted %s attribute states", deleted_rows)

# Evict any entries in the state_attributes_ids cache referring to a purged state
Expand All @@ -365,11 +343,7 @@ def _purge_event_data_ids(
) -> None:
"""Delete old event data ids."""

deleted_rows = (
session.query(EventData)
.filter(EventData.data_id.in_(data_ids))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_event_data_rows(data_ids))
_LOGGER.debug("Deleted %s data events", deleted_rows)

# Evict any entries in the event_data_ids cache referring to a purged state
Expand All @@ -378,33 +352,23 @@ def _purge_event_data_ids(

def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None:
"""Delete by run_id."""
deleted_rows = (
session.query(StatisticsRuns)
.filter(StatisticsRuns.run_id.in_(statistics_runs))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_statistics_runs_rows(statistics_runs))
_LOGGER.debug("Deleted %s statistic runs", deleted_rows)


def _purge_short_term_statistics(
session: Session, short_term_statistics: list[int]
) -> None:
"""Delete by id."""
deleted_rows = (
session.query(StatisticsShortTerm)
.filter(StatisticsShortTerm.id.in_(short_term_statistics))
.delete(synchronize_session=False)
deleted_rows = session.execute(
delete_statistics_short_term_rows(short_term_statistics)
)
_LOGGER.debug("Deleted %s short term statistics", deleted_rows)


def _purge_event_ids(session: Session, event_ids: Iterable[int]) -> None:
"""Delete by event id."""
deleted_rows = (
session.query(Events)
.filter(Events.event_id.in_(event_ids))
.delete(synchronize_session=False)
)
deleted_rows = session.execute(delete_event_rows(event_ids))
_LOGGER.debug("Deleted %s events", deleted_rows)


Expand All @@ -413,11 +377,8 @@ def _purge_old_recorder_runs(
) -> None:
"""Purge all old recorder runs."""
# Recorder runs is small, no need to batch run it
deleted_rows = (
session.query(RecorderRuns)
.filter(RecorderRuns.start < purge_before)
.filter(RecorderRuns.run_id != instance.run_history.current.run_id)
.delete(synchronize_session=False)
deleted_rows = session.execute(
delete_recorder_runs_rows(purge_before, instance.run_history.current.run_id)
)
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows)

Expand Down
Loading