Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion homeassistant/components/recorder/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,4 @@ async def _process_recorder_platform(
) -> None:
"""Process a recorder platform."""
instance: Recorder = hass.data[DATA_INSTANCE]
instance.queue.put(AddRecorderPlatformTask(domain, platform))
instance.queue_task(AddRecorderPlatformTask(domain, platform))
170 changes: 77 additions & 93 deletions homeassistant/components/recorder/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
MATCH_ALL,
)
from homeassistant.core import CALLBACK_TYPE, CoreState, Event, HomeAssistant, callback
from homeassistant.helpers.entityfilter import generate_filter
from homeassistant.helpers.event import (
async_track_time_change,
async_track_time_interval,
Expand All @@ -38,9 +37,6 @@

from . import migration, statistics
from .const import (
ATTR_APPLY_FILTER,
ATTR_KEEP_DAYS,
ATTR_REPACK,
DB_WORKER_PREFIX,
KEEPALIVE_TIME,
MAX_QUEUE_BACKLOG,
Expand Down Expand Up @@ -70,7 +66,6 @@
ExternalStatisticsTask,
KeepAliveTask,
PerodicCleanupTask,
PurgeEntitiesTask,
PurgeTask,
RecorderTask,
StatisticsTask,
Expand Down Expand Up @@ -112,6 +107,7 @@

COMMIT_TASK = CommitTask()
KEEP_ALIVE_TASK = KeepAliveTask()
WAIT_TASK = WaitTask()

DB_LOCK_TIMEOUT = 30
DB_LOCK_QUEUE_CHECK_TIMEOUT = 1
Expand Down Expand Up @@ -152,7 +148,7 @@ def __init__(
self.keep_days = keep_days
self._hass_started: asyncio.Future[object] = asyncio.Future()
self.commit_interval = commit_interval
self.queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue()
self._queue: queue.SimpleQueue[RecorderTask] = queue.SimpleQueue()
self.db_url = uri
self.db_max_retries = db_max_retries
self.db_retry_wait = db_retry_wait
Expand All @@ -175,21 +171,42 @@ def __init__(
self.event_session: Session | None = None
self.get_session: Callable[[], Session] | None = None
self._completed_first_database_setup: bool | None = None
self._event_listener: CALLBACK_TYPE | None = None
self.async_migration_event = asyncio.Event()
self.migration_in_progress = False
self._queue_watcher: CALLBACK_TYPE | None = None
self._db_supports_row_number = True
self._database_lock_task: DatabaseLockTask | None = None
self._db_executor: DBInterruptibleThreadPoolExecutor | None = None
self._exclude_attributes_by_domain = exclude_attributes_by_domain

self._event_listener: CALLBACK_TYPE | None = None
self._queue_watcher: CALLBACK_TYPE | None = None
self._keep_alive_listener: CALLBACK_TYPE | None = None
self._commit_listener: CALLBACK_TYPE | None = None
self._periodic_listener: CALLBACK_TYPE | None = None
self._nightly_listener: CALLBACK_TYPE | None = None
self.enabled = True

@property
def backlog(self) -> int:
"""Return the number of items in the recorder backlog."""
return self._queue.qsize()

@property
def _using_file_sqlite(self) -> bool:
"""Short version to check if we are using sqlite3 as a file."""
return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith(
SQLITE_URL_PREFIX
)

@property
def recording(self) -> bool:
"""Return if the recorder is recording."""
return self._event_listener is not None

def queue_task(self, task: RecorderTask) -> None:
"""Add a task to the recorder queue."""
self._queue.put(task)

def set_enable(self, enable: bool) -> None:
"""Enable or disable recording events and states."""
self.enabled = enable
Expand Down Expand Up @@ -222,7 +239,7 @@ def async_initialize(self) -> None:
def _async_keep_alive(self, now: datetime) -> None:
"""Queue a keep alive."""
if self._event_listener:
self.queue.put(KEEP_ALIVE_TASK)
self.queue_task(KEEP_ALIVE_TASK)

@callback
def _async_commit(self, now: datetime) -> None:
Expand All @@ -232,7 +249,7 @@ def _async_commit(self, now: datetime) -> None:
and not self._database_lock_task
and self._event_session_has_pending_writes()
):
self.queue.put(COMMIT_TASK)
self.queue_task(COMMIT_TASK)

@callback
def async_add_executor_job(
Expand All @@ -253,7 +270,7 @@ def _async_check_queue(self, *_: Any) -> None:

The queue grows during migraton or if something really goes wrong.
"""
size = self.queue.qsize()
size = self.backlog
_LOGGER.debug("Recorder queue size is: %s", size)
if size <= MAX_QUEUE_BACKLOG:
return
Expand Down Expand Up @@ -314,73 +331,52 @@ def _async_event_filter(self, event: Event) -> bool:
# Unknown what it is.
return True

def do_adhoc_purge(self, **kwargs: Any) -> None:
"""Trigger an adhoc purge retaining keep_days worth of data."""
keep_days = kwargs.get(ATTR_KEEP_DAYS, self.keep_days)
repack = cast(bool, kwargs[ATTR_REPACK])
apply_filter = cast(bool, kwargs[ATTR_APPLY_FILTER])

purge_before = dt_util.utcnow() - timedelta(days=keep_days)
self.queue.put(PurgeTask(purge_before, repack, apply_filter))

def do_adhoc_purge_entities(
self, entity_ids: set[str], domains: list[str], entity_globs: list[str]
) -> None:
"""Trigger an adhoc purge of requested entities."""
entity_filter = generate_filter(domains, list(entity_ids), [], [], entity_globs)
self.queue.put(PurgeEntitiesTask(entity_filter))

def do_adhoc_statistics(self, **kwargs: Any) -> None:
"""Trigger an adhoc statistics run."""
if not (start := kwargs.get("start")):
start = statistics.get_start_time()
self.queue.put(StatisticsTask(start))
self.queue_task(StatisticsTask(start))

Comment on lines 334 to 339
Copy link
Copy Markdown
Member Author

@bdraco bdraco May 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently only call this in tests. We should probably move this into the the test code instead in a future PR

@callback
def async_register(self) -> None:
"""Post connection initialize."""
def _empty_queue(self, event: Event) -> None:
"""Empty the queue if its still present at final write."""

def _empty_queue(event: Event) -> None:
"""Empty the queue if its still present at final write."""

# If the queue is full of events to be processed because
# the database is so broken that every event results in a retry
# we will never be able to get though the events to shutdown in time.
#
# We drain all the events in the queue and then insert
# an empty one to ensure the next thing the recorder sees
# is a request to shutdown.
while True:
try:
self.queue.get_nowait()
except queue.Empty:
break
self.queue.put(StopTask())

self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, _empty_queue)

async def _async_shutdown(event: Event) -> None:
"""Shut down the Recorder."""
if not self._hass_started.done():
self._hass_started.set_result(SHUTDOWN_TASK)
self.queue.put(StopTask())
self._async_stop_listeners()
await self.hass.async_add_executor_job(self.join)
# If the queue is full of events to be processed because
# the database is so broken that every event results in a retry
# we will never be able to get though the events to shutdown in time.
#
# We drain all the events in the queue and then insert
# an empty one to ensure the next thing the recorder sees
# is a request to shutdown.
while True:
try:
self._queue.get_nowait()
except queue.Empty:
break
self.queue_task(StopTask())

async def _async_shutdown(self, event: Event) -> None:
"""Shut down the Recorder."""
if not self._hass_started.done():
self._hass_started.set_result(SHUTDOWN_TASK)
self.queue_task(StopTask())
self._async_stop_listeners()
await self.hass.async_add_executor_job(self.join)

self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_shutdown)
@callback
def _async_hass_started(self, event: Event) -> None:
"""Notify that hass has started."""
self._hass_started.set_result(None)

@callback
def async_register(self) -> None:
"""Post connection initialize."""
bus = self.hass.bus
bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._empty_queue)
bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self._async_shutdown)
if self.hass.state == CoreState.running:
self._hass_started.set_result(None)
return

@callback
def _async_hass_started(event: Event) -> None:
"""Notify that hass has started."""
self._hass_started.set_result(None)

self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, _async_hass_started
)
bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, self._async_hass_started)

@callback
def async_connection_failed(self) -> None:
Expand Down Expand Up @@ -414,9 +410,9 @@ def async_nightly_tasks(self, now: datetime) -> None:
# until after the database is vacuumed
repack = self.auto_repack and is_second_sunday(now)
purge_before = dt_util.utcnow() - timedelta(days=self.keep_days)
self.queue.put(PurgeTask(purge_before, repack=repack, apply_filter=False))
self.queue_task(PurgeTask(purge_before, repack=repack, apply_filter=False))
else:
self.queue.put(PerodicCleanupTask())
self.queue_task(PerodicCleanupTask())

@callback
def async_periodic_statistics(self, now: datetime) -> None:
Expand All @@ -425,33 +421,33 @@ def async_periodic_statistics(self, now: datetime) -> None:
Short term statistics run every 5 minutes
"""
start = statistics.get_start_time()
self.queue.put(StatisticsTask(start))
self.queue_task(StatisticsTask(start))

@callback
def async_adjust_statistics(
self, statistic_id: str, start_time: datetime, sum_adjustment: float
) -> None:
"""Adjust statistics."""
self.queue.put(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment))
self.queue_task(AdjustStatisticsTask(statistic_id, start_time, sum_adjustment))

@callback
def async_clear_statistics(self, statistic_ids: list[str]) -> None:
"""Clear statistics for a list of statistic_ids."""
self.queue.put(ClearStatisticsTask(statistic_ids))
self.queue_task(ClearStatisticsTask(statistic_ids))

@callback
def async_update_statistics_metadata(
self, statistic_id: str, unit_of_measurement: str | None
) -> None:
"""Update statistics metadata for a statistic_id."""
self.queue.put(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement))
self.queue_task(UpdateStatisticsMetadataTask(statistic_id, unit_of_measurement))

@callback
def async_external_statistics(
self, metadata: StatisticMetaData, stats: Iterable[StatisticData]
) -> None:
"""Schedule external statistics."""
self.queue.put(ExternalStatisticsTask(metadata, stats))
self.queue_task(ExternalStatisticsTask(metadata, stats))

@callback
def using_sqlite(self) -> bool:
Expand Down Expand Up @@ -553,7 +549,7 @@ def _run_event_loop(self) -> None:
# has changed. This reduces the disk io.
self.stop_requested = False
while not self.stop_requested:
task = self.queue.get()
task = self._queue.get()
_LOGGER.debug("Processing task: %s", task)
try:
self._process_one_task_or_recover(task)
Expand Down Expand Up @@ -643,7 +639,7 @@ def _async_set_database_locked(task: DatabaseLockTask) -> None:
# Notify that lock is being held, wait until database can be used again.
self.hass.add_job(_async_set_database_locked, task)
while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT):
if self.queue.qsize() > MAX_QUEUE_BACKLOG * 0.9:
if self.backlog > MAX_QUEUE_BACKLOG * 0.9:
_LOGGER.warning(
"Database queue backlog reached more than 90% of maximum queue "
"length while waiting for backup to finish; recorder will now "
Expand All @@ -654,7 +650,7 @@ def _async_set_database_locked(task: DatabaseLockTask) -> None:
break
_LOGGER.info(
"Database queue backlog reached %d entries during backup",
self.queue.qsize(),
self.backlog,
)

def _process_one_event(self, event: Event) -> None:
Expand Down Expand Up @@ -908,7 +904,7 @@ def _send_keep_alive(self) -> None:
@callback
def event_listener(self, event: Event) -> None:
"""Listen for new events and put them in the process queue."""
self.queue.put(EventTask(event))
self.queue_task(EventTask(event))

def block_till_done(self) -> None:
"""Block till all events processed.
Expand All @@ -923,7 +919,7 @@ def block_till_done(self) -> None:
is in the database.
"""
self._queue_watch.clear()
self.queue.put(WaitTask())
self.queue_task(WAIT_TASK)
self._queue_watch.wait()

async def lock_database(self) -> bool:
Expand All @@ -940,7 +936,7 @@ async def lock_database(self) -> bool:

database_locked = asyncio.Event()
task = DatabaseLockTask(database_locked, threading.Event(), False)
self.queue.put(task)
self.queue_task(task)
try:
await asyncio.wait_for(database_locked.wait(), timeout=DB_LOCK_TIMEOUT)
except asyncio.TimeoutError as err:
Expand Down Expand Up @@ -1013,13 +1009,6 @@ def setup_recorder_connection(
self.get_session = scoped_session(sessionmaker(bind=self.engine, future=True))
_LOGGER.debug("Connected to recorder database")

@property
def _using_file_sqlite(self) -> bool:
"""Short version to check if we are using sqlite3 as a file."""
return self.db_url != SQLITE_URL_PREFIX and self.db_url.startswith(
SQLITE_URL_PREFIX
)

def _close_connection(self) -> None:
"""Close the connection."""
assert self.engine is not None
Expand Down Expand Up @@ -1053,7 +1042,7 @@ def _schedule_compile_missing_statistics(self, session: Session) -> None:
while start < last_period:
end = start + timedelta(minutes=5)
_LOGGER.debug("Compiling missing statistics for %s-%s", start, end)
self.queue.put(StatisticsTask(start))
self.queue_task(StatisticsTask(start))
start = end

def _end_session(self) -> None:
Expand All @@ -1075,8 +1064,3 @@ def _shutdown(self) -> None:
self._stop_executor()
self._end_session()
self._close_connection()

@property
def recording(self) -> bool:
"""Return if the recorder is recording."""
return self._event_listener is not None
Loading