diff --git a/homeassistant/components/recorder/__init__.py b/homeassistant/components/recorder/__init__.py index f93d965a4b9d0..10b987b04f7b5 100644 --- a/homeassistant/components/recorder/__init__.py +++ b/homeassistant/components/recorder/__init__.py @@ -3,7 +3,7 @@ import asyncio import concurrent.futures -from datetime import datetime +from datetime import datetime, timedelta import logging import queue import sqlite3 @@ -12,6 +12,7 @@ from typing import Any, Callable, NamedTuple from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.pool import StaticPool import voluptuous as vol @@ -20,7 +21,7 @@ from homeassistant.const import ( ATTR_ENTITY_ID, CONF_EXCLUDE, - EVENT_HOMEASSISTANT_START, + EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, EVENT_TIME_CHANGED, @@ -33,6 +34,7 @@ INCLUDE_EXCLUDE_FILTER_SCHEMA_INNER, convert_include_exclude_filter, ) +from homeassistant.helpers.event import async_track_time_interval, track_time_change from homeassistant.helpers.typing import ConfigType import homeassistant.util.dt as dt_util @@ -56,6 +58,8 @@ ATTR_REPACK = "repack" ATTR_APPLY_FILTER = "apply_filter" +MAX_QUEUE_BACKLOG = 30000 + SERVICE_PURGE_SCHEMA = vol.Schema( { vol.Optional(ATTR_KEEP_DAYS): cv.positive_int, @@ -99,6 +103,7 @@ { vol.Optional(DOMAIN, default=dict): vol.All( cv.deprecated(CONF_PURGE_INTERVAL), + cv.deprecated(CONF_DB_INTEGRITY_CHECK), FILTER_SCHEMA.extend( { vol.Optional(CONF_AUTO_PURGE, default=True): cv.boolean, @@ -176,11 +181,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: commit_interval = conf[CONF_COMMIT_INTERVAL] db_max_retries = conf[CONF_DB_MAX_RETRIES] db_retry_wait = conf[CONF_DB_RETRY_WAIT] - db_integrity_check = conf[CONF_DB_INTEGRITY_CHECK] - - db_url = conf.get(CONF_DB_URL) - if not db_url: - db_url = DEFAULT_URL.format(hass_config_path=hass.config.path(DEFAULT_DB_FILE)) + db_url = conf.get(CONF_DB_URL) or DEFAULT_URL.format( + hass_config_path=hass.config.path(DEFAULT_DB_FILE) + ) exclude = conf[CONF_EXCLUDE] exclude_t = exclude.get(CONF_EVENT_TYPES, []) instance = hass.data[DATA_INSTANCE] = Recorder( @@ -193,10 +196,17 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: db_retry_wait=db_retry_wait, entity_filter=entity_filter, exclude_t=exclude_t, - db_integrity_check=db_integrity_check, ) instance.async_initialize() instance.start() + _async_register_services(hass, instance) + + return await instance.async_db_ready + + +@callback +def _async_register_services(hass, instance): + """Register recorder services.""" async def async_handle_purge_service(service): """Handle calls to the purge service.""" @@ -223,8 +233,6 @@ async def async_handle_disable_service(service): schema=SERVICE_DISABLE_SCHEMA, ) - return await instance.async_db_ready - class PurgeTask(NamedTuple): """Object to store information about purge task.""" @@ -252,7 +260,6 @@ def __init__( db_retry_wait: int, entity_filter: Callable[[str], bool], exclude_t: list[str], - db_integrity_check: bool, ) -> None: """Initialize the recorder.""" threading.Thread.__init__(self, name="Recorder") @@ -266,8 +273,8 @@ def __init__( self.db_url = uri self.db_max_retries = db_max_retries self.db_retry_wait = db_retry_wait - self.db_integrity_check = db_integrity_check self.async_db_ready = asyncio.Future() + self.async_recorder_ready = asyncio.Event() self._queue_watch = threading.Event() self.engine: Any = None self.run_info: Any = None @@ -283,6 +290,9 @@ def __init__( self.event_session = None self.get_session = None self._completed_database_setup = None + self._event_listener = None + + self._queue_watcher = None self.enabled = True @@ -293,9 +303,37 @@ def set_enable(self, enable): @callback def async_initialize(self): """Initialize the recorder.""" - self.hass.bus.async_listen( + self._event_listener = self.hass.bus.async_listen( MATCH_ALL, self.event_listener, event_filter=self._async_event_filter ) + self._queue_watcher = async_track_time_interval( + self.hass, self._async_check_queue, timedelta(minutes=10) + ) + + @callback + def _async_check_queue(self, *_): + """Periodic check of the queue size to ensure we do not exaust memory. + + The queue grows during migraton or if something really goes wrong. + """ + size = self.queue.qsize() + _LOGGER.debug("Recorder queue size is: %s", size) + if self.queue.qsize() <= MAX_QUEUE_BACKLOG: + return + _LOGGER.error( + "The recorder queue reached the maximum size of %s; Events are no longer being recorded", + MAX_QUEUE_BACKLOG, + ) + self._stop_queue_watcher_and_event_listener() + + def _stop_queue_watcher_and_event_listener(self): + """Stop watching the queue.""" + if self._queue_watcher: + self._queue_watcher() + self._queue_watcher = None + if self._event_listener: + self._event_listener() + self._event_listener = None @callback def _async_event_filter(self, event): @@ -314,89 +352,152 @@ def do_adhoc_purge(self, **kwargs): self.queue.put(PurgeTask(keep_days, repack, apply_filter)) - def run(self): - """Start processing events to save.""" + @callback + def async_register(self, shutdown_task, hass_started): + """Post connection initialize.""" - if not self._setup_recorder(): - return + def shutdown(event): + """Shut down the Recorder.""" + if not hass_started.done(): + hass_started.set_result(shutdown_task) + self.queue.put(None) + self.join() - shutdown_task = object() - hass_started = concurrent.futures.Future() + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + + if self.hass.state == CoreState.running: + hass_started.set_result(None) + return @callback - def register(): - """Post connection initialize.""" - self.async_db_ready.set_result(True) + def async_hass_started(event): + """Notify that hass has started.""" + hass_started.set_result(None) - def shutdown(event): - """Shut down the Recorder.""" - if not hass_started.done(): - hass_started.set_result(shutdown_task) - self.queue.put(None) - self.join() + self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, async_hass_started) - self.hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, shutdown) + @callback + def async_connection_failed(self): + """Connect failed tasks.""" + self.async_db_ready.set_result(False) + persistent_notification.async_create( + self.hass, + "The recorder could not start, check [the logs](/config/logs)", + "Recorder", + ) + self._stop_queue_watcher_and_event_listener() - if self.hass.state == CoreState.running: - hass_started.set_result(None) - else: + @callback + def async_connection_success(self): + """Connect success tasks.""" + self.async_db_ready.set_result(True) - @callback - def notify_hass_started(event): - """Notify that hass has started.""" - hass_started.set_result(None) + @callback + def _async_recorder_ready(self): + """Mark recorder ready.""" + self.async_recorder_ready.set() - self.hass.bus.async_listen_once( - EVENT_HOMEASSISTANT_START, notify_hass_started - ) + @callback + def async_purge(self, now): + """Trigger the purge.""" + self.queue.put(PurgeTask(self.keep_days, repack=False, apply_filter=False)) - self.hass.add_job(register) - result = hass_started.result() + def run(self): + """Start processing events to save.""" + shutdown_task = object() + hass_started = concurrent.futures.Future() + + self.hass.add_job(self.async_register, shutdown_task, hass_started) + + current_version = self._setup_recorder() + + if current_version is None: + self.hass.add_job(self.async_connection_failed) + return + + schema_is_current = migration.schema_is_current(current_version) + if schema_is_current: + self._setup_run() + + self.hass.add_job(self.async_connection_success) # If shutdown happened before Home Assistant finished starting - if result is shutdown_task: + if hass_started.result() is shutdown_task: # Make sure we cleanly close the run if # we restart before startup finishes self._shutdown() return - # Start periodic purge - if self.auto_purge: - - @callback - def async_purge(now): - """Trigger the purge.""" - self.queue.put( - PurgeTask(self.keep_days, repack=False, apply_filter=False) + # We wait to start the migration until startup has finished + # since it can be cpu intensive and we do not want it to compete + # with startup which is also cpu intensive + if not schema_is_current: + if self._migrate_schema_and_setup_run(current_version): + if not self._event_listener: + # If the schema migration takes so longer that the end + # queue watcher safety kicks in because MAX_QUEUE_BACKLOG + # is reached, we need to reinitialize the listener. + self.hass.add_job(self.async_initialize) + else: + persistent_notification.create( + self.hass, + "The database migration failed, check [the logs](/config/logs)." + "Database Migration Failed", + "recorder_database_migration", ) + self._shutdown() + return + # Start periodic purge + if self.auto_purge: # Purge every night at 4:12am - self.hass.helpers.event.track_time_change( - async_purge, hour=4, minute=12, second=0 - ) + track_time_change(self.hass, self.async_purge, hour=4, minute=12, second=0) _LOGGER.debug("Recorder processing the queue") + self.hass.add_job(self._async_recorder_ready) + self._run_event_loop() + + def _run_event_loop(self): + """Run the event loop for the recorder.""" # Use a session for the event read loop # with a commit every time the event time # has changed. This reduces the disk io. - while True: - event = self.queue.get() + while event := self.queue.get(): + try: + self._process_one_event_or_recover(event) + except Exception as err: # pylint: disable=broad-except + _LOGGER.exception("Error while processing event %s: %s", event, err) - if event is None: - self._shutdown() - return + self._shutdown() + def _process_one_event_or_recover(self, event): + """Process an event, reconnect, or recover a malformed database.""" + try: self._process_one_event(event) + return + except exc.DatabaseError as err: + if self._handle_database_error(err): + return + _LOGGER.exception( + "Unhandled database error while processing event %s: %s", event, err + ) + except SQLAlchemyError as err: + _LOGGER.exception( + "SQLAlchemyError error processing event %s: %s", event, err + ) - def _setup_recorder(self) -> bool: - """Create schema and connect to the database.""" + # Reset the session if an SQLAlchemyError (including DatabaseError) + # happens to rollback and recover + self._reopen_event_session() + + def _setup_recorder(self) -> None | int: + """Create connect to the database and get the schema version.""" tries = 1 while tries <= self.db_max_retries: try: self._setup_connection() - migration.migrate_schema(self) - self._setup_run() + return migration.get_schema_version(self) except Exception as err: # pylint: disable=broad-except _LOGGER.exception( "Error during connection setup to %s: %s (retrying in %s seconds)", @@ -404,37 +505,47 @@ def _setup_recorder(self) -> bool: err, self.db_retry_wait, ) - else: - _LOGGER.debug("Connected to recorder database") - self._open_event_session() - return True - tries += 1 time.sleep(self.db_retry_wait) - @callback - def connection_failed(): - """Connect failed tasks.""" - self.async_db_ready.set_result(False) - persistent_notification.async_create( - self.hass, - "The recorder could not start, please check the log", - "Recorder", - ) + return None - self.hass.add_job(connection_failed) - return False + def _migrate_schema_and_setup_run(self, current_version) -> bool: + """Migrate schema to the latest version.""" + persistent_notification.create( + self.hass, + "System performance will temporarily degrade during the database upgrade. Do not power down or restart the system until the upgrade completes. Integrations that read the database, such as logbook and history, may return inconsistent results until the upgrade completes.", + "Database upgrade in progress", + "recorder_database_migration", + ) + + try: + migration.migrate_schema(self, current_version) + except exc.DatabaseError as err: + if self._handle_database_error(err): + return True + _LOGGER.exception("Database error during schema migration") + return False + except Exception: # pylint: disable=broad-except + _LOGGER.exception("Error during schema migration") + return False + else: + self._setup_run() + return True + finally: + persistent_notification.dismiss(self.hass, "recorder_database_migration") + + def _run_purge(self, keep_days, repack, apply_filter): + """Purge the database.""" + if purge.purge_old_data(self, keep_days, repack, apply_filter): + return + # Schedule a new purge task if this one didn't finish + self.queue.put(PurgeTask(keep_days, repack, apply_filter)) def _process_one_event(self, event): """Process one event.""" if isinstance(event, PurgeTask): - # Schedule a new purge task if this one didn't finish - if not purge.purge_old_data( - self, event.keep_days, event.repack, event.apply_filter - ): - self.queue.put( - PurgeTask(event.keep_days, event.repack, event.apply_filter) - ) + self._run_purge(event.keep_days, event.repack, event.apply_filter) return if isinstance(event, WaitTask): self._queue_watch.set() @@ -448,7 +559,7 @@ def _process_one_event(self, event): self._timechanges_seen += 1 if self._timechanges_seen >= self.commit_interval: self._timechanges_seen = 0 - self._commit_event_session_or_recover() + self._commit_event_session_or_retry() return if not self.enabled: @@ -464,10 +575,6 @@ def _process_one_event(self, event): except (TypeError, ValueError): _LOGGER.warning("Event is not JSON serializable: %s", event) return - except Exception as err: # pylint: disable=broad-except - # Must catch the exception to prevent the loop from collapsing - _LOGGER.exception("Error adding event: %s", err) - return if event.event_type == EVENT_STATE_CHANGED: try: @@ -492,34 +599,21 @@ def _process_one_event(self, event): "State is not JSON serializable: %s", event.data.get("new_state"), ) - except Exception as err: # pylint: disable=broad-except - # Must catch the exception to prevent the loop from collapsing - _LOGGER.exception("Error adding state change: %s", err) # If they do not have a commit interval # than we commit right away if not self.commit_interval: - self._commit_event_session_or_recover() - - def _commit_event_session_or_recover(self): - """Commit changes to the database and recover if the database fails when possible.""" - try: self._commit_event_session_or_retry() - return - except exc.DatabaseError as err: - if isinstance(err.__cause__, sqlite3.DatabaseError): - _LOGGER.exception( - "Unrecoverable sqlite3 database corruption detected: %s", err - ) - self._handle_sqlite_corruption() - return - _LOGGER.exception("Unexpected error saving events: %s", err) - except Exception as err: # pylint: disable=broad-except - # Must catch the exception to prevent the loop from collapsing - _LOGGER.exception("Unexpected error saving events: %s", err) - self._reopen_event_session() - return + def _handle_database_error(self, err): + """Handle a database error that may result in moving away the corrupt db.""" + if isinstance(err.__cause__, sqlite3.DatabaseError): + _LOGGER.exception( + "Unrecoverable sqlite3 database corruption detected: %s", err + ) + self._handle_sqlite_corruption() + return True + return False def _commit_event_session_or_retry(self): tries = 1 @@ -566,44 +660,41 @@ def _commit_event_session(self): def _handle_sqlite_corruption(self): """Handle the sqlite3 database being corrupt.""" + self._close_event_session() self._close_connection() move_away_broken_database(dburl_to_path(self.db_url)) self._setup_recorder() + self._setup_run() - def _reopen_event_session(self): - """Rollback the event session and reopen it after a failure.""" + def _close_event_session(self): + """Close the event session.""" self._old_states = {} + if not self.event_session: + return + try: self.event_session.rollback() self.event_session.close() - except Exception as err: # pylint: disable=broad-except - # Must catch the exception to prevent the loop from collapsing + except SQLAlchemyError as err: _LOGGER.exception( "Error while rolling back and closing the event session: %s", err ) + def _reopen_event_session(self): + """Rollback the event session and reopen it after a failure.""" + self._close_event_session() self._open_event_session() def _open_event_session(self): """Open the event session.""" - try: - self.event_session = self.get_session() - self.event_session.expire_on_commit = False - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception("Error while creating new event session: %s", err) + self.event_session = self.get_session() + self.event_session.expire_on_commit = False def _send_keep_alive(self): - try: - _LOGGER.debug("Sending keepalive") - self.event_session.connection().scalar(select([1])) - return - except Exception as err: # pylint: disable=broad-except - _LOGGER.error( - "Error in database connectivity during keepalive: %s", - err, - ) - self._reopen_event_session() + """Send a keep alive to keep the db connection open.""" + _LOGGER.debug("Sending keepalive") + self.event_session.connection().scalar(select([1])) @callback def event_listener(self, event): @@ -663,20 +754,7 @@ def setup_recorder_connection(dbapi_connection, connection_record): kwargs["echo"] = False if self._using_file_sqlite: - with self.hass.timeout.freeze(DOMAIN): - # - # Here we run an sqlite3 quick_check. In the majority - # of cases, the quick_check takes under 10 seconds. - # - # On systems with very large databases and - # very slow disk or cpus, this can take a while. - # - validate_or_move_away_sqlite_database( - self.db_url, self.db_integrity_check - ) - - if self.engine is not None: - self.engine.dispose() + validate_or_move_away_sqlite_database(self.db_url) self.engine = create_engine(self.db_url, **kwargs) @@ -684,6 +762,7 @@ def setup_recorder_connection(dbapi_connection, connection_record): Base.metadata.create_all(self.engine) self.get_session = scoped_session(sessionmaker(bind=self.engine)) + _LOGGER.debug("Connected to recorder database") @property def _using_file_sqlite(self): @@ -716,18 +795,24 @@ def _setup_run(self): session.flush() session.expunge(self.run_info) - def _shutdown(self): - """Save end time for current run.""" - if self.event_session is not None: + self._open_event_session() + + def _end_session(self): + """End the recorder session.""" + if self.event_session is None: + return + try: self.run_info.end = dt_util.utcnow() self.event_session.add(self.run_info) - try: - self._commit_event_session_or_retry() - self.event_session.close() - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception( - "Error saving the event session during shutdown: %s", err - ) + self._commit_event_session_or_retry() + self.event_session.close() + except Exception as err: # pylint: disable=broad-except + _LOGGER.exception("Error saving the event session during shutdown: %s", err) self.run_info = None + + def _shutdown(self): + """Save end time for current run.""" + self._stop_queue_watcher_and_event_listener() + self._end_session() self._close_connection() diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index fa93f6155617c..5f138d01f176c 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -11,15 +11,14 @@ ) from sqlalchemy.schema import AddConstraint, DropConstraint -from .const import DOMAIN from .models import SCHEMA_VERSION, TABLE_STATES, Base, SchemaChanges from .util import session_scope _LOGGER = logging.getLogger(__name__) -def migrate_schema(instance): - """Check if the schema needs to be upgraded.""" +def get_schema_version(instance): + """Get the schema version.""" with session_scope(session=instance.get_session()) as session: res = ( session.query(SchemaChanges) @@ -34,21 +33,27 @@ def migrate_schema(instance): "No schema version found. Inspected version: %s", current_version ) - if current_version == SCHEMA_VERSION: - return + return current_version + + +def schema_is_current(current_version): + """Check if the schema is current.""" + return current_version == SCHEMA_VERSION + +def migrate_schema(instance, current_version): + """Check if the schema needs to be upgraded.""" + with session_scope(session=instance.get_session()) as session: _LOGGER.warning( "Database is about to upgrade. Schema version: %s", current_version ) + for version in range(current_version, SCHEMA_VERSION): + new_version = version + 1 + _LOGGER.info("Upgrading recorder db schema to version %s", new_version) + _apply_update(instance.engine, new_version, current_version) + session.add(SchemaChanges(schema_version=new_version)) - with instance.hass.timeout.freeze(DOMAIN): - for version in range(current_version, SCHEMA_VERSION): - new_version = version + 1 - _LOGGER.info("Upgrading recorder db schema to version %s", new_version) - _apply_update(instance.engine, new_version, current_version) - session.add(SchemaChanges(schema_version=new_version)) - - _LOGGER.info("Upgrade to version %s done", new_version) + _LOGGER.info("Upgrade to version %s done", new_version) def _create_index(engine, table_name, index_name): diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index ef626a744c4fa..424070156b05c 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -6,7 +6,7 @@ import time from typing import TYPE_CHECKING -from sqlalchemy.exc import OperationalError, SQLAlchemyError +from sqlalchemy.exc import OperationalError from sqlalchemy.orm.session import Session from sqlalchemy.sql.expression import distinct @@ -69,8 +69,7 @@ def purge_old_data( return False _LOGGER.warning("Error purging history: %s", err) - except SQLAlchemyError as err: - _LOGGER.warning("Error purging history: %s", err) + return True diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index c17fb33d365ed..89f74c44f4e67 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -14,8 +14,13 @@ from homeassistant.helpers.typing import HomeAssistantType import homeassistant.util.dt as dt_util -from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, SQLITE_URL_PREFIX -from .models import ALL_TABLES, process_timestamp +from .const import DATA_INSTANCE, SQLITE_URL_PREFIX +from .models import ( + ALL_TABLES, + TABLE_RECORDER_RUNS, + TABLE_SCHEMA_CHANGES, + process_timestamp, +) _LOGGER = logging.getLogger(__name__) @@ -117,7 +122,7 @@ def execute(qry, to_native=False, validate_entity_ids=True): time.sleep(QUERY_RETRY_WAIT) -def validate_or_move_away_sqlite_database(dburl: str, db_integrity_check: bool) -> bool: +def validate_or_move_away_sqlite_database(dburl: str) -> bool: """Ensure that the database is valid or move it away.""" dbpath = dburl_to_path(dburl) @@ -125,7 +130,7 @@ def validate_or_move_away_sqlite_database(dburl: str, db_integrity_check: bool) # Database does not exist yet, this is OK return True - if not validate_sqlite_database(dbpath, db_integrity_check): + if not validate_sqlite_database(dbpath): move_away_broken_database(dbpath) return False @@ -161,18 +166,21 @@ def basic_sanity_check(cursor): """Check tables to make sure select does not fail.""" for table in ALL_TABLES: - cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection + if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES): + cursor.execute(f"SELECT * FROM {table};") # nosec # not injection + else: + cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection return True -def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool: +def validate_sqlite_database(dbpath: str) -> bool: """Run a quick check on an sqlite database to see if it is corrupt.""" import sqlite3 # pylint: disable=import-outside-toplevel try: conn = sqlite3.connect(dbpath) - run_checks_on_open_db(dbpath, conn.cursor(), db_integrity_check) + run_checks_on_open_db(dbpath, conn.cursor()) conn.close() except sqlite3.DatabaseError: _LOGGER.exception("The database at %s is corrupt or malformed", dbpath) @@ -181,24 +189,14 @@ def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool: return True -def run_checks_on_open_db(dbpath, cursor, db_integrity_check): +def run_checks_on_open_db(dbpath, cursor): """Run checks that will generate a sqlite3 exception if there is corruption.""" sanity_check_passed = basic_sanity_check(cursor) last_run_was_clean = last_run_was_recently_clean(cursor) if sanity_check_passed and last_run_was_clean: _LOGGER.debug( - "The quick_check will be skipped as the system was restarted cleanly and passed the basic sanity check" - ) - return - - if not db_integrity_check: - # Always warn so when it does fail they remember it has - # been manually disabled - _LOGGER.warning( - "The quick_check on the sqlite3 database at %s was skipped because %s was disabled", - dbpath, - CONF_DB_INTEGRITY_CHECK, + "The system was restarted cleanly and passed the basic sanity check" ) return @@ -214,11 +212,6 @@ def run_checks_on_open_db(dbpath, cursor, db_integrity_check): dbpath, ) - _LOGGER.info( - "A quick_check is being performed on the sqlite3 database at %s", dbpath - ) - cursor.execute("PRAGMA QUICK_CHECK") - def move_away_broken_database(dbfile: str) -> None: """Move away a broken sqlite3 database.""" diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index b3c58995b37e2..67032e9f077d2 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -3,13 +3,14 @@ from datetime import datetime, timedelta from unittest.mock import patch -from sqlalchemy.exc import OperationalError +from sqlalchemy.exc import OperationalError, SQLAlchemyError +from homeassistant.components import recorder from homeassistant.components.recorder import ( CONF_DB_URL, CONFIG_SCHEMA, - DATA_INSTANCE, DOMAIN, + KEEPALIVE_TIME, SERVICE_DISABLE, SERVICE_ENABLE, SERVICE_PURGE, @@ -19,15 +20,17 @@ run_information_from_instance, run_information_with_session, ) +from homeassistant.components.recorder.const import DATA_INSTANCE from homeassistant.components.recorder.models import Events, RecorderRuns, States from homeassistant.components.recorder.util import session_scope from homeassistant.const import ( + EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP, MATCH_ALL, STATE_LOCKED, STATE_UNLOCKED, ) -from homeassistant.core import Context, CoreState, callback +from homeassistant.core import Context, CoreState, HomeAssistant, callback from homeassistant.helpers.typing import HomeAssistantType from homeassistant.setup import async_setup_component, setup_component from homeassistant.util import dt as dt_util @@ -41,18 +44,35 @@ from .conftest import SetupRecorderInstanceT from tests.common import ( + async_fire_time_changed, async_init_recorder_component, fire_time_changed, get_test_home_assistant, ) +def _default_recorder(hass): + """Return a recorder with reasonable defaults.""" + return Recorder( + hass, + auto_purge=True, + keep_days=7, + commit_interval=1, + uri="sqlite://", + db_max_retries=10, + db_retry_wait=3, + entity_filter=CONFIG_SCHEMA({DOMAIN: {}}), + exclude_t=[], + ) + + async def test_shutdown_before_startup_finishes(hass): """Test shutdown before recorder starts is clean.""" hass.state = CoreState.not_running await async_init_recorder_component(hass) + await hass.data[DATA_INSTANCE].async_db_ready await hass.async_block_till_done() session = await hass.async_add_executor_job(hass.data[DATA_INSTANCE].get_session) @@ -69,6 +89,31 @@ async def test_shutdown_before_startup_finishes(hass): assert run_info.end is not None +async def test_state_gets_saved_when_set_before_start_event( + hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test we can record an event when starting with not running.""" + + hass.state = CoreState.not_running + + await async_init_recorder_component(hass) + + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + hass.states.async_set(entity_id, state, attributes) + + hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED) + + await async_wait_recording_done_without_instance(hass) + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 1 + assert db_states[0].event_id > 0 + + async def test_saving_state( hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT ): @@ -92,6 +137,58 @@ async def test_saving_state( assert state == _state_empty_context(hass, entity_id) +async def test_saving_many_states( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test we expire after many commits.""" + instance = await async_setup_recorder_instance(hass) + + entity_id = "test.recorder" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + with patch.object( + hass.data[DATA_INSTANCE].event_session, "expire_all" + ) as expire_all, patch.object(recorder, "EXPIRE_AFTER_COMMITS", 2): + for _ in range(3): + hass.states.async_set(entity_id, "on", attributes) + await async_wait_recording_done(hass, instance) + hass.states.async_set(entity_id, "off", attributes) + await async_wait_recording_done(hass, instance) + + assert expire_all.called + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 6 + assert db_states[0].event_id > 0 + + +async def test_saving_state_with_intermixed_time_changes( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test saving states with intermixed time changes.""" + instance = await async_setup_recorder_instance(hass) + + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + attributes2 = {"test_attr": 10, "test_attr_10": "mean"} + + for _ in range(KEEPALIVE_TIME + 1): + async_fire_time_changed(hass, dt_util.utcnow()) + hass.states.async_set(entity_id, state, attributes) + for _ in range(KEEPALIVE_TIME + 1): + async_fire_time_changed(hass, dt_util.utcnow()) + hass.states.async_set(entity_id, state, attributes2) + + await async_wait_recording_done(hass, instance) + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 2 + assert db_states[0].event_id > 0 + + def test_saving_state_with_exception(hass, hass_recorder, caplog): """Test saving and restoring a state.""" hass = hass_recorder() @@ -130,6 +227,44 @@ def _throw_if_state_in_session(*args, **kwargs): assert "Error saving events" not in caplog.text +def test_saving_state_with_sqlalchemy_exception(hass, hass_recorder, caplog): + """Test saving state when there is an SQLAlchemyError.""" + hass = hass_recorder() + + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + def _throw_if_state_in_session(*args, **kwargs): + for obj in hass.data[DATA_INSTANCE].event_session: + if isinstance(obj, States): + raise SQLAlchemyError( + "insert the state", "fake params", "forced to fail" + ) + + with patch("time.sleep"), patch.object( + hass.data[DATA_INSTANCE].event_session, + "flush", + side_effect=_throw_if_state_in_session, + ): + hass.states.set(entity_id, "fail", attributes) + wait_recording_done(hass) + + assert "SQLAlchemyError error processing event" in caplog.text + + caplog.clear() + hass.states.set(entity_id, state, attributes) + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) >= 1 + + assert "Error executing query" not in caplog.text + assert "Error saving events" not in caplog.text + assert "SQLAlchemyError error processing event" not in caplog.text + + def test_saving_event(hass, hass_recorder): """Test saving and restoring an event.""" hass = hass_recorder() @@ -171,6 +306,25 @@ def event_listener(event): ) +def test_saving_state_with_commit_interval_zero(hass_recorder): + """Test saving a state with a commit interval of zero.""" + hass = hass_recorder({"commit_interval": 0}) + assert hass.data[DATA_INSTANCE].commit_interval == 0 + + entity_id = "test.recorder" + state = "restoring_from_db" + attributes = {"test_attr": 5, "test_attr_10": "nice"} + + hass.states.set(entity_id, state, attributes) + + wait_recording_done(hass) + + with session_scope(hass=hass) as session: + db_states = list(session.query(States)) + assert len(db_states) == 1 + assert db_states[0].event_id > 0 + + def _add_entities(hass, entity_ids): """Add entities.""" attributes = {"test_attr": 5, "test_attr_10": "nice"} @@ -351,26 +505,27 @@ def test_saving_state_and_removing_entity(hass, hass_recorder): assert states[2].state is None -def test_recorder_setup_failure(): +def test_recorder_setup_failure(hass): """Test some exceptions.""" - hass = get_test_home_assistant() + with patch.object(Recorder, "_setup_connection") as setup, patch( + "homeassistant.components.recorder.time.sleep" + ): + setup.side_effect = ImportError("driver not found") + rec = _default_recorder(hass) + rec.async_initialize() + rec.start() + rec.join() + + hass.stop() + +def test_recorder_setup_failure_without_event_listener(hass): + """Test recorder setup failure when the event listener is not setup.""" with patch.object(Recorder, "_setup_connection") as setup, patch( "homeassistant.components.recorder.time.sleep" ): setup.side_effect = ImportError("driver not found") - rec = Recorder( - hass, - auto_purge=True, - keep_days=7, - commit_interval=1, - uri="sqlite://", - db_max_retries=10, - db_retry_wait=3, - entity_filter=CONFIG_SCHEMA({DOMAIN: {}}), - exclude_t=[], - db_integrity_check=False, - ) + rec = _default_recorder(hass) rec.start() rec.join() @@ -481,6 +636,7 @@ def test_saving_state_with_serializable_data(hass_recorder, caplog): """Test saving data that cannot be serialized does not crash.""" hass = hass_recorder() + hass.bus.fire("bad_event", {"fail": CannotSerializeMe()}) hass.states.set("test.one", "on", {"fail": CannotSerializeMe()}) wait_recording_done(hass) hass.states.set("test.two", "on", {}) @@ -699,15 +855,20 @@ def _create_tmpdir_for_test_db(): hass.states.async_set("test.lost", "on", {}) - await async_wait_recording_done_without_instance(hass) - await hass.async_add_executor_job(corrupt_db_file, test_db_file) - await async_wait_recording_done_without_instance(hass) - - # This state will not be recorded because - # the database corruption will be discovered - # and we will have to rollback to recover - hass.states.async_set("test.one", "off", {}) - await async_wait_recording_done_without_instance(hass) + with patch.object( + hass.data[DATA_INSTANCE].event_session, + "close", + side_effect=OperationalError("statement", {}, []), + ): + await async_wait_recording_done_without_instance(hass) + await hass.async_add_executor_job(corrupt_db_file, test_db_file) + await async_wait_recording_done_without_instance(hass) + + # This state will not be recorded because + # the database corruption will be discovered + # and we will have to rollback to recover + hass.states.async_set("test.one", "off", {}) + await async_wait_recording_done_without_instance(hass) assert "Unrecoverable sqlite3 database corruption detected" in caplog.text assert "The system will rename the corrupt database file" in caplog.text diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index c4e0d32adcf06..113598ff6dece 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -1,19 +1,41 @@ """The tests for the Recorder component.""" # pylint: disable=protected-access +import datetime +import sqlite3 from unittest.mock import Mock, PropertyMock, call, patch import pytest from sqlalchemy import create_engine -from sqlalchemy.exc import InternalError, OperationalError, ProgrammingError +from sqlalchemy.exc import ( + DatabaseError, + InternalError, + OperationalError, + ProgrammingError, +) from sqlalchemy.pool import StaticPool from homeassistant.bootstrap import async_setup_component -from homeassistant.components.recorder import RecorderRuns, const, migration, models +from homeassistant.components import recorder +from homeassistant.components.recorder import RecorderRuns, migration, models +from homeassistant.components.recorder.const import DATA_INSTANCE +from homeassistant.components.recorder.models import States +from homeassistant.components.recorder.util import session_scope import homeassistant.util.dt as dt_util +from .common import async_wait_recording_done_without_instance + +from tests.common import async_fire_time_changed, async_mock_service from tests.components.recorder import models_original +def _get_native_states(hass, entity_id): + with session_scope(hass=hass) as session: + return [ + state.to_native() + for state in session.query(States).filter(States.entity_id == entity_id) + ] + + def create_engine_test(*args, **kwargs): """Test version of create_engine that initializes with old schema. @@ -26,6 +48,7 @@ def create_engine_test(*args, **kwargs): async def test_schema_update_calls(hass): """Test that schema migrations occur in correct order.""" + await async_setup_component(hass, "persistent_notification", {}) with patch( "homeassistant.components.recorder.create_engine", new=create_engine_test ), patch( @@ -35,16 +58,147 @@ async def test_schema_update_calls(hass): await async_setup_component( hass, "recorder", {"recorder": {"db_url": "sqlite://"}} ) - await hass.async_block_till_done() + await async_wait_recording_done_without_instance(hass) update.assert_has_calls( [ - call(hass.data[const.DATA_INSTANCE].engine, version + 1, 0) + call(hass.data[DATA_INSTANCE].engine, version + 1, 0) for version in range(0, models.SCHEMA_VERSION) ] ) +async def test_database_migration_failed(hass): + """Test we notify if the migration fails.""" + await async_setup_component(hass, "persistent_notification", {}) + create_calls = async_mock_service(hass, "persistent_notification", "create") + dismiss_calls = async_mock_service(hass, "persistent_notification", "dismiss") + + with patch( + "homeassistant.components.recorder.create_engine", new=create_engine_test + ), patch( + "homeassistant.components.recorder.migration._apply_update", + side_effect=ValueError, + ): + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + hass.states.async_set("my.entity", "off", {}) + await hass.async_block_till_done() + await hass.async_add_executor_job(hass.data[DATA_INSTANCE].join) + await hass.async_block_till_done() + + assert len(create_calls) == 2 + assert len(dismiss_calls) == 1 + + +async def test_database_migration_encounters_corruption(hass): + """Test we move away the database if its corrupt.""" + await async_setup_component(hass, "persistent_notification", {}) + + sqlite3_exception = DatabaseError("statement", {}, []) + sqlite3_exception.__cause__ = sqlite3.DatabaseError() + + with patch( + "homeassistant.components.recorder.migration.schema_is_current", + side_effect=[False, True], + ), patch( + "homeassistant.components.recorder.migration.migrate_schema", + side_effect=sqlite3_exception, + ), patch( + "homeassistant.components.recorder.move_away_broken_database" + ) as move_away: + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + hass.states.async_set("my.entity", "off", {}) + await async_wait_recording_done_without_instance(hass) + + assert move_away.called + + +async def test_database_migration_encounters_corruption_not_sqlite(hass): + """Test we fail on database error when we cannot recover.""" + await async_setup_component(hass, "persistent_notification", {}) + create_calls = async_mock_service(hass, "persistent_notification", "create") + dismiss_calls = async_mock_service(hass, "persistent_notification", "dismiss") + + with patch( + "homeassistant.components.recorder.migration.schema_is_current", + side_effect=[False, True], + ), patch( + "homeassistant.components.recorder.migration.migrate_schema", + side_effect=DatabaseError("statement", {}, []), + ), patch( + "homeassistant.components.recorder.move_away_broken_database" + ) as move_away: + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + hass.states.async_set("my.entity", "off", {}) + await hass.async_block_till_done() + await hass.async_add_executor_job(hass.data[DATA_INSTANCE].join) + await hass.async_block_till_done() + + assert not move_away.called + assert len(create_calls) == 2 + assert len(dismiss_calls) == 1 + + +async def test_events_during_migration_are_queued(hass): + """Test that events during migration are queued.""" + + await async_setup_component(hass, "persistent_notification", {}) + with patch( + "homeassistant.components.recorder.create_engine", new=create_engine_test + ): + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + hass.states.async_set("my.entity", "off", {}) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=2)) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=4)) + await hass.data[DATA_INSTANCE].async_recorder_ready.wait() + await async_wait_recording_done_without_instance(hass) + + db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity") + assert len(db_states) == 2 + + +async def test_events_during_migration_queue_exhausted(hass): + """Test that events during migration takes so long the queue is exhausted.""" + await async_setup_component(hass, "persistent_notification", {}) + + with patch( + "homeassistant.components.recorder.create_engine", new=create_engine_test + ), patch.object(recorder, "MAX_QUEUE_BACKLOG", 1): + await async_setup_component( + hass, "recorder", {"recorder": {"db_url": "sqlite://"}} + ) + hass.states.async_set("my.entity", "on", {}) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=2)) + await hass.async_block_till_done() + async_fire_time_changed(hass, dt_util.utcnow() + datetime.timedelta(hours=4)) + await hass.async_block_till_done() + hass.states.async_set("my.entity", "off", {}) + await hass.data[DATA_INSTANCE].async_recorder_ready.wait() + await async_wait_recording_done_without_instance(hass) + + db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity") + assert len(db_states) == 1 + hass.states.async_set("my.entity", "on", {}) + await async_wait_recording_done_without_instance(hass) + db_states = await hass.async_add_executor_job(_get_native_states, hass, "my.entity") + assert len(db_states) == 2 + + async def test_schema_migrate(hass): """Test the full schema migration logic. @@ -53,6 +207,8 @@ async def test_schema_migrate(hass): inspection could quickly become quite cumbersome. """ + await async_setup_component(hass, "persistent_notification", {}) + def _mock_setup_run(self): self.run_info = RecorderRuns( start=self.recording_start, created=dt_util.utcnow() diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index f2fa9bf640026..b97873df62e2a 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -1,7 +1,10 @@ """Test data purging.""" from datetime import datetime, timedelta import json +import sqlite3 +from unittest.mock import patch +from sqlalchemy.exc import DatabaseError from sqlalchemy.orm.session import Session from homeassistant.components import recorder @@ -16,6 +19,7 @@ async_recorder_block_till_done, async_wait_purge_done, async_wait_recording_done, + async_wait_recording_done_without_instance, ) from .conftest import SetupRecorderInstanceT @@ -52,6 +56,38 @@ async def test_purge_old_states( assert states.count() == 2 +async def test_purge_old_states_encouters_database_corruption( + hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT +): + """Test database image image is malformed while deleting old states.""" + instance = await async_setup_recorder_instance(hass) + + await _add_test_states(hass, instance) + await async_wait_recording_done_without_instance(hass) + + sqlite3_exception = DatabaseError("statement", {}, []) + sqlite3_exception.__cause__ = sqlite3.DatabaseError() + + with patch( + "homeassistant.components.recorder.move_away_broken_database" + ) as move_away, patch( + "homeassistant.components.recorder.purge.purge_old_data", + side_effect=sqlite3_exception, + ): + await hass.services.async_call( + recorder.DOMAIN, recorder.SERVICE_PURGE, {"keep_days": 0} + ) + await hass.async_block_till_done() + await async_wait_recording_done_without_instance(hass) + + assert move_away.called + + # Ensure the whole database was reset due to the database error + with session_scope(hass=hass) as session: + states_after_purge = session.query(States) + assert states_after_purge.count() == 0 + + async def test_purge_old_events( hass: HomeAssistantType, async_setup_recorder_instance: SetupRecorderInstanceT ): diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index c814570416c32..4da635209b334 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -74,75 +74,28 @@ def to_native(validate_entity_id=True): assert e_mock.call_count == 2 -def test_validate_or_move_away_sqlite_database_with_integrity_check( - hass, tmpdir, caplog -): - """Ensure a malformed sqlite database is moved away. - - A quick_check is run here - """ - - db_integrity_check = True - - test_dir = tmpdir.mkdir("test_validate_or_move_away_sqlite_database") - test_db_file = f"{test_dir}/broken.db" - dburl = f"{SQLITE_URL_PREFIX}{test_db_file}" - - assert util.validate_sqlite_database(test_db_file, db_integrity_check) is False - assert os.path.exists(test_db_file) is True - assert ( - util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False - ) - - corrupt_db_file(test_db_file) - - assert util.validate_sqlite_database(dburl, db_integrity_check) is False - - assert ( - util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False - ) - - assert "corrupt or malformed" in caplog.text - - assert util.validate_sqlite_database(dburl, db_integrity_check) is False - - assert util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is True - - -def test_validate_or_move_away_sqlite_database_without_integrity_check( - hass, tmpdir, caplog -): - """Ensure a malformed sqlite database is moved away. - - The quick_check is skipped, but we can still find - corruption if the whole database is unreadable - """ - - db_integrity_check = False +def test_validate_or_move_away_sqlite_database(hass, tmpdir, caplog): + """Ensure a malformed sqlite database is moved away.""" test_dir = tmpdir.mkdir("test_validate_or_move_away_sqlite_database") test_db_file = f"{test_dir}/broken.db" dburl = f"{SQLITE_URL_PREFIX}{test_db_file}" - assert util.validate_sqlite_database(test_db_file, db_integrity_check) is False + assert util.validate_sqlite_database(test_db_file) is False assert os.path.exists(test_db_file) is True - assert ( - util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False - ) + assert util.validate_or_move_away_sqlite_database(dburl) is False corrupt_db_file(test_db_file) - assert util.validate_sqlite_database(dburl, db_integrity_check) is False + assert util.validate_sqlite_database(dburl) is False - assert ( - util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is False - ) + assert util.validate_or_move_away_sqlite_database(dburl) is False assert "corrupt or malformed" in caplog.text - assert util.validate_sqlite_database(dburl, db_integrity_check) is False + assert util.validate_sqlite_database(dburl) is False - assert util.validate_or_move_away_sqlite_database(dburl, db_integrity_check) is True + assert util.validate_or_move_away_sqlite_database(dburl) is True async def test_last_run_was_recently_clean(hass): @@ -197,12 +150,10 @@ def test_combined_checks(hass_recorder, caplog): cursor = hass.data[DATA_INSTANCE].engine.raw_connection().cursor() - assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None - assert "skipped because db_integrity_check was disabled" in caplog.text + assert util.run_checks_on_open_db("fake_db_path", cursor) is None + assert "could not validate that the sqlite3 database" in caplog.text caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None - assert "could not validate that the sqlite3 database" in caplog.text # We are patching recorder.util here in order # to avoid creating the full database on disk @@ -210,50 +161,36 @@ def test_combined_checks(hass_recorder, caplog): "homeassistant.components.recorder.util.basic_sanity_check", return_value=False ): caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None - assert "skipped because db_integrity_check was disabled" in caplog.text - - caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None + assert util.run_checks_on_open_db("fake_db_path", cursor) is None assert "could not validate that the sqlite3 database" in caplog.text # We are patching recorder.util here in order # to avoid creating the full database on disk with patch("homeassistant.components.recorder.util.last_run_was_recently_clean"): caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, False) is None - assert ( - "system was restarted cleanly and passed the basic sanity check" - in caplog.text - ) - - caplog.clear() - assert util.run_checks_on_open_db("fake_db_path", cursor, True) is None - assert ( - "system was restarted cleanly and passed the basic sanity check" - in caplog.text - ) + assert util.run_checks_on_open_db("fake_db_path", cursor) is None + assert "restarted cleanly and passed the basic sanity check" in caplog.text caplog.clear() with patch( "homeassistant.components.recorder.util.last_run_was_recently_clean", side_effect=sqlite3.DatabaseError, ), pytest.raises(sqlite3.DatabaseError): - util.run_checks_on_open_db("fake_db_path", cursor, False) + util.run_checks_on_open_db("fake_db_path", cursor) caplog.clear() with patch( "homeassistant.components.recorder.util.last_run_was_recently_clean", side_effect=sqlite3.DatabaseError, ), pytest.raises(sqlite3.DatabaseError): - util.run_checks_on_open_db("fake_db_path", cursor, True) + util.run_checks_on_open_db("fake_db_path", cursor) cursor.execute("DROP TABLE events;") caplog.clear() with pytest.raises(sqlite3.DatabaseError): - util.run_checks_on_open_db("fake_db_path", cursor, False) + util.run_checks_on_open_db("fake_db_path", cursor) caplog.clear() with pytest.raises(sqlite3.DatabaseError): - util.run_checks_on_open_db("fake_db_path", cursor, True) + util.run_checks_on_open_db("fake_db_path", cursor)