Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions homeassistant/components/recorder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@

ATTR_KEEP_DAYS = "keep_days"
ATTR_REPACK = "repack"
ATTR_APPLY_FILTER = "apply_filter"

SERVICE_PURGE_SCHEMA = vol.Schema(
{
vol.Optional(ATTR_KEEP_DAYS): cv.positive_int,
vol.Optional(ATTR_REPACK, default=False): cv.boolean,
vol.Optional(ATTR_APPLY_FILTER, default=False): cv.boolean,
}
)
SERVICE_ENABLE_SCHEMA = vol.Schema({})
Expand Down Expand Up @@ -227,6 +229,7 @@ class PurgeTask(NamedTuple):

keep_days: int
repack: bool
apply_filter: bool


class WaitTask:
Expand Down Expand Up @@ -309,8 +312,9 @@ def do_adhoc_purge(self, **kwargs):
"""Trigger an adhoc purge retaining keep_days worth of data."""
keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days)
repack = kwargs.get(ATTR_REPACK)
apply_filter = kwargs.get(ATTR_APPLY_FILTER)

self.queue.put(PurgeTask(keep_days, repack))
self.queue.put(PurgeTask(keep_days, repack, apply_filter))

def run(self):
"""Start processing events to save."""
Expand Down Expand Up @@ -364,7 +368,9 @@ def notify_hass_started(event):
@callback
def async_purge(now):
"""Trigger the purge."""
self.queue.put(PurgeTask(self.keep_days, repack=False))
self.queue.put(
PurgeTask(self.keep_days, repack=False, apply_filter=False)
)

# Purge every night at 4:12am
self.hass.helpers.event.track_time_change(
Expand Down Expand Up @@ -425,8 +431,12 @@ def _process_one_event(self, event):
"""Process one event."""
if isinstance(event, PurgeTask):
# Schedule a new purge task if this one didn't finish
if not purge.purge_old_data(self, event.keep_days, event.repack):
self.queue.put(PurgeTask(event.keep_days, event.repack))
if not purge.purge_old_data(
self, event.keep_days, event.repack, event.apply_filter
):
self.queue.put(
PurgeTask(event.keep_days, event.repack, event.apply_filter)
)
return
if isinstance(event, WaitTask):
self._queue_watch.set()
Expand Down
75 changes: 74 additions & 1 deletion homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct

import homeassistant.util.dt as dt_util

Expand All @@ -22,7 +23,9 @@
_LOGGER = logging.getLogger(__name__)


def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
def purge_old_data(
instance: Recorder, purge_days: int, repack: bool, apply_filter: bool = False
) -> bool:
"""Purge events and states older than purge_days ago.

Cleans up an timeframe of an hour, based on the oldest record.
Expand All @@ -45,6 +48,9 @@ def purge_old_data(instance: Recorder, purge_days: int, repack: bool) -> bool:
# return false, as we are not done yet.
_LOGGER.debug("Purging hasn't fully completed yet")
return False
if apply_filter and _purge_filtered_data(instance, session) is False:
_LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
return False
_purge_old_recorder_runs(instance, session, purge_before)
if repack:
repack_database(instance)
Expand Down Expand Up @@ -140,3 +146,70 @@ def _purge_old_recorder_runs(
.delete(synchronize_session=False)
)
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows)


def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
"""Remove filtered states and events that shouldn't be in the database."""
_LOGGER.debug("Cleanup filtered data")

# Check if excluded entity_ids are in database
excluded_entity_ids: list[str] = [
entity_id
for (entity_id,) in session.query(distinct(States.entity_id)).all()
if not instance.entity_filter(entity_id)
]
if len(excluded_entity_ids) > 0:
_purge_filtered_states(session, excluded_entity_ids)
return False

# Check if excluded event_types are in database
excluded_event_types: list[str] = [
event_type
for (event_type,) in session.query(distinct(Events.event_type)).all()
if event_type in instance.exclude_t
]
if len(excluded_event_types) > 0:
_purge_filtered_events(session, excluded_event_types)
return False

return True


def _purge_filtered_states(session: Session, excluded_entity_ids: list[str]) -> None:
"""Remove filtered states and linked events."""
state_ids: list[int]
event_ids: list[int | None]
state_ids, event_ids = zip(
*(
session.query(States.state_id, States.event_id)
.filter(States.entity_id.in_(excluded_entity_ids))
.limit(MAX_ROWS_TO_PURGE)
.all()
)
)
event_ids = [id_ for id_ in event_ids if id_ is not None]
_LOGGER.debug(
"Selected %s state_ids to remove that should be filtered", len(state_ids)
)
_purge_state_ids(session, state_ids)
_purge_event_ids(session, event_ids) # type: ignore # type of event_ids already narrowed to 'list[int]'


def _purge_filtered_events(session: Session, excluded_event_types: list[str]) -> None:
"""Remove filtered events and linked states."""
events: list[Events] = (
session.query(Events.event_id)
.filter(Events.event_type.in_(excluded_event_types))
.limit(MAX_ROWS_TO_PURGE)
.all()
)
event_ids: list[int] = [event.event_id for event in events]
_LOGGER.debug(
"Selected %s event_ids to remove that should be filtered", len(event_ids)
)
states: list[States] = (
session.query(States.state_id).filter(States.event_id.in_(event_ids)).all()
)
state_ids: list[int] = [state.state_id for state in states]
_purge_state_ids(session, state_ids)
_purge_event_ids(session, event_ids)
8 changes: 8 additions & 0 deletions homeassistant/components/recorder/services.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ purge:
selector:
boolean:

apply_filter:
name: Apply filter
description: Apply entity_id and event_type filter in addition to time based purge.
example: true
default: false
selector:
boolean:

disable:
description: Stop the recording of events and state changes

Expand Down
Loading