From 95f2baa3b3f619d16ce64a8474e5070a72c102c7 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 8 May 2022 16:01:00 -0500 Subject: [PATCH 1/3] Make database access in the eventloop raise an exception - In 2022.4 we added a warning that the database executor should be used for database access which revealed places where the database was accessed in the eventloop - If the database is accessed in the default executor, it will continue to report the access, but proceed as normal. --- homeassistant/components/recorder/pool.py | 11 +++++++++++ tests/common.py | 5 ++--- tests/components/recorder/test_init.py | 18 ++++++++++++++---- tests/components/recorder/test_pool.py | 14 +++++++------- tests/components/recorder/test_util.py | 6 +++++- tests/conftest.py | 5 ++--- 6 files changed, 41 insertions(+), 18 deletions(-) diff --git a/homeassistant/components/recorder/pool.py b/homeassistant/components/recorder/pool.py index 027b9bfbc25876..0d5f81369d977c 100644 --- a/homeassistant/components/recorder/pool.py +++ b/homeassistant/components/recorder/pool.py @@ -20,6 +20,16 @@ POOL_SIZE = 5 +def _raise_if_main_thread() -> None: + """Raise an exception if we are running in the main thread.""" + if threading.current_thread() == threading.main_thread(): + raise RuntimeError( + "Detected database access from the event loop; This is causing stability issues; " + "Use homeassistant.components.recorder.get_instance(hass).async_add_executor_job() " + "for database operations" + ) + + class RecorderPool(SingletonThreadPool, NullPool): # type: ignore[misc] """A hybrid of NullPool and SingletonThreadPool. @@ -62,6 +72,7 @@ def dispose(self) -> None: def _do_get(self) -> Any: if self.recorder_or_dbworker: return super()._do_get() + _raise_if_main_thread() report( "accesses the database without the database executor; " "Use homeassistant.components.recorder.get_instance(hass).async_add_executor_job() " diff --git a/tests/common.py b/tests/common.py index 73625bcfe071df..2289d5a0617ac6 100644 --- a/tests/common.py +++ b/tests/common.py @@ -905,9 +905,8 @@ def init_recorder_component(hass, add_config=None): if recorder.CONF_COMMIT_INTERVAL not in config: config[recorder.CONF_COMMIT_INTERVAL] = 0 - with patch( - "homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", - True, + with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True,), patch( + "homeassistant.components.recorder.pool._raise_if_main_thread", ), patch("homeassistant.components.recorder.migration.migrate_schema"): assert setup_component(hass, recorder.DOMAIN, {recorder.DOMAIN: config}) assert recorder.DOMAIN in hass.config.components diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index 0b2b7b2dcb8d0c..c55a94a34e8cfc 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -951,7 +951,8 @@ def test_statistics_runs_initiated(hass_recorder): ) - timedelta(minutes=5) -def test_compile_missing_statistics(tmpdir): +@patch("homeassistant.components.recorder.pool._raise_if_main_thread") +def test_compile_missing_statistics(mock_main_thread, tmpdir): """Test missing statistics are compiled on startup.""" now = dt_util.utcnow().replace(minute=0, second=0, microsecond=0) test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") @@ -1168,7 +1169,8 @@ def test_service_disable_states_not_recording(hass, hass_recorder): assert db_states[0].to_native() == _state_with_context(hass, "test.two") -def test_service_disable_run_information_recorded(tmpdir): +@patch("homeassistant.components.recorder.pool._raise_if_main_thread") +def test_service_disable_run_information_recorded(mock_main_thread, tmpdir): """Test that runs are still recorded when recorder is disabled.""" test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" @@ -1318,8 +1320,12 @@ def test_entity_id_filter(hass_recorder): assert len(db_events) == idx + 1, data +@patch("homeassistant.components.recorder.pool._raise_if_main_thread") async def test_database_lock_and_unlock( - hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, tmp_path + mock_main_thread, + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, + tmp_path, ): """Test writing events during lock getting written after unlocking.""" # Use file DB, in memory DB cannot do write locks. @@ -1357,8 +1363,12 @@ async def test_database_lock_and_unlock( assert len(db_events) == 1 +@patch("homeassistant.components.recorder.pool._raise_if_main_thread") async def test_database_lock_and_overflow( - hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, tmp_path + mock_main_thread, + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, + tmp_path, ): """Test writing events during lock leading to overflow the queue causes the database to unlock.""" # Use file DB, in memory DB cannot do write locks. diff --git a/tests/components/recorder/test_pool.py b/tests/components/recorder/test_pool.py index ca6a88d84a70ad..d39d916dc9828f 100644 --- a/tests/components/recorder/test_pool.py +++ b/tests/components/recorder/test_pool.py @@ -1,6 +1,7 @@ """Test pool.""" import threading +import pytest from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker @@ -28,30 +29,29 @@ def _get_connection_twice(): connections.append(session.connection().connection.connection) session.close() - _get_connection_twice() - assert "accesses the database without the database executor" in caplog.text - assert connections[0] != connections[1] + with pytest.raises(RuntimeError): + _get_connection_twice() caplog.clear() new_thread = threading.Thread(target=_get_connection_twice) new_thread.start() new_thread.join() assert "accesses the database without the database executor" in caplog.text - assert connections[2] != connections[3] + assert connections[0] != connections[1] caplog.clear() new_thread = threading.Thread(target=_get_connection_twice, name=DB_WORKER_PREFIX) new_thread.start() new_thread.join() assert "accesses the database without the database executor" not in caplog.text - assert connections[4] == connections[5] + assert connections[2] == connections[3] caplog.clear() new_thread = threading.Thread(target=_get_connection_twice, name="Recorder") new_thread.start() new_thread.join() assert "accesses the database without the database executor" not in caplog.text - assert connections[6] == connections[7] + assert connections[4] == connections[5] shutdown = True caplog.clear() @@ -59,4 +59,4 @@ def _get_connection_twice(): new_thread.start() new_thread.join() assert "accesses the database without the database executor" not in caplog.text - assert connections[8] != connections[9] + assert connections[6] != connections[7] diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index 07334fef1c3d4c..433cbdb452b7cd 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -597,8 +597,12 @@ def test_periodic_db_cleanups(hass_recorder): assert str(text_obj) == "PRAGMA wal_checkpoint(TRUNCATE);" +@patch("homeassistant.components.recorder.pool._raise_if_main_thread") async def test_write_lock_db( - hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, tmp_path + mock_main_thread, + hass: HomeAssistant, + async_setup_recorder_instance: SetupRecorderInstanceT, + tmp_path, ): """Test database write lock.""" from sqlalchemy.exc import OperationalError diff --git a/tests/conftest.py b/tests/conftest.py index 0155a965fe6ec2..838e962e4bd1de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -692,9 +692,8 @@ async def _async_init_recorder_component(hass, add_config=None): if recorder.CONF_COMMIT_INTERVAL not in config: config[recorder.CONF_COMMIT_INTERVAL] = 0 - with patch( - "homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", - True, + with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True,), patch( + "homeassistant.components.recorder.pool._raise_if_main_thread", ), patch("homeassistant.components.recorder.migration.migrate_schema"): assert await async_setup_component( hass, recorder.DOMAIN, {recorder.DOMAIN: config} From 9b1c7416d2af8c01e48f1264d6c1d7b6730d8f84 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 9 May 2022 08:54:49 -0500 Subject: [PATCH 2/3] add coverage --- homeassistant/components/recorder/pool.py | 24 +++++++++-------- homeassistant/util/async_.py | 13 ++++++--- tests/common.py | 6 ++--- tests/components/recorder/test_init.py | 32 +++++++++++------------ tests/components/recorder/test_pool.py | 10 ++++--- tests/components/recorder/test_util.py | 4 +-- tests/conftest.py | 6 ++--- 7 files changed, 53 insertions(+), 42 deletions(-) diff --git a/homeassistant/components/recorder/pool.py b/homeassistant/components/recorder/pool.py index 0d5f81369d977c..e7a3dc9e3f212e 100644 --- a/homeassistant/components/recorder/pool.py +++ b/homeassistant/components/recorder/pool.py @@ -8,6 +8,7 @@ from sqlalchemy.pool import NullPool, SingletonThreadPool, StaticPool from homeassistant.helpers.frame import report +from homeassistant.util.async_ import check_loop from .const import DB_WORKER_PREFIX @@ -19,15 +20,9 @@ POOL_SIZE = 5 - -def _raise_if_main_thread() -> None: - """Raise an exception if we are running in the main thread.""" - if threading.current_thread() == threading.main_thread(): - raise RuntimeError( - "Detected database access from the event loop; This is causing stability issues; " - "Use homeassistant.components.recorder.get_instance(hass).async_add_executor_job() " - "for database operations" - ) +EVENT_LOOP_ADVISE_MSG = ( + "Use homeassistant.components.recorder.get_instance(hass).async_add_executor_job()" +) class RecorderPool(SingletonThreadPool, NullPool): # type: ignore[misc] @@ -72,10 +67,17 @@ def dispose(self) -> None: def _do_get(self) -> Any: if self.recorder_or_dbworker: return super()._do_get() - _raise_if_main_thread() + check_loop( + self._do_get_db_connection_protected, + strict=True, + advise_msg=EVENT_LOOP_ADVISE_MSG, + ) + return self._do_get_db_connection_protected() + + def _do_get_db_connection_protected(self) -> Any: report( "accesses the database without the database executor; " - "Use homeassistant.components.recorder.get_instance(hass).async_add_executor_job() " + f"{EVENT_LOOP_ADVISE_MSG} " "for faster database operations", exclude_integrations={"recorder"}, error_if_core=False, diff --git a/homeassistant/util/async_.py b/homeassistant/util/async_.py index 4e7d05f7c2e627..8b96e85664da14 100644 --- a/homeassistant/util/async_.py +++ b/homeassistant/util/async_.py @@ -94,8 +94,14 @@ def run_callback() -> None: return future -def check_loop(func: Callable[..., Any], strict: bool = True) -> None: - """Warn if called inside the event loop. Raise if `strict` is True.""" +def check_loop( + func: Callable[..., Any], strict: bool = True, advise_msg: str | None = None +) -> None: + """Warn if called inside the event loop. Raise if `strict` is True. + + The default advisory message is 'Use `await hass.async_add_executor_job()' + Set `advise_msg` to an alternate message if the the solution differs. + """ try: get_running_loop() in_loop = True @@ -134,6 +140,7 @@ def check_loop(func: Callable[..., Any], strict: bool = True) -> None: if found_frame is None: raise RuntimeError( f"Detected blocking call to {func.__name__} inside the event loop. " + f"{advise_msg or 'Use `await hass.async_add_executor_job()`'}; " "This is causing stability issues. Please report issue" ) @@ -160,7 +167,7 @@ def check_loop(func: Callable[..., Any], strict: bool = True) -> None: if strict: raise RuntimeError( "Blocking calls must be done in the executor or a separate thread; " - "Use `await hass.async_add_executor_job()` " + f"{advise_msg or 'Use `await hass.async_add_executor_job()`'}; " f"at {found_frame.filename[index:]}, line {found_frame.lineno}: {(found_frame.line or '?').strip()}" ) diff --git a/tests/common.py b/tests/common.py index 2289d5a0617ac6..bd0b828737b5bc 100644 --- a/tests/common.py +++ b/tests/common.py @@ -905,9 +905,9 @@ def init_recorder_component(hass, add_config=None): if recorder.CONF_COMMIT_INTERVAL not in config: config[recorder.CONF_COMMIT_INTERVAL] = 0 - with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True,), patch( - "homeassistant.components.recorder.pool._raise_if_main_thread", - ), patch("homeassistant.components.recorder.migration.migrate_schema"): + with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( + "homeassistant.components.recorder.migration.migrate_schema" + ): assert setup_component(hass, recorder.DOMAIN, {recorder.DOMAIN: config}) assert recorder.DOMAIN in hass.config.components _LOGGER.info( diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index c55a94a34e8cfc..49133dd67bd60f 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -951,8 +951,7 @@ def test_statistics_runs_initiated(hass_recorder): ) - timedelta(minutes=5) -@patch("homeassistant.components.recorder.pool._raise_if_main_thread") -def test_compile_missing_statistics(mock_main_thread, tmpdir): +def test_compile_missing_statistics(tmpdir): """Test missing statistics are compiled on startup.""" now = dt_util.utcnow().replace(minute=0, second=0, microsecond=0) test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") @@ -1169,8 +1168,7 @@ def test_service_disable_states_not_recording(hass, hass_recorder): assert db_states[0].to_native() == _state_with_context(hass, "test.two") -@patch("homeassistant.components.recorder.pool._raise_if_main_thread") -def test_service_disable_run_information_recorded(mock_main_thread, tmpdir): +def test_service_disable_run_information_recorded(tmpdir): """Test that runs are still recorded when recorder is disabled.""" test_db_file = tmpdir.mkdir("sqlite").join("test_run_info.db") dburl = f"{SQLITE_URL_PREFIX}//{test_db_file}" @@ -1320,9 +1318,7 @@ def test_entity_id_filter(hass_recorder): assert len(db_events) == idx + 1, data -@patch("homeassistant.components.recorder.pool._raise_if_main_thread") async def test_database_lock_and_unlock( - mock_main_thread, hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, tmp_path, @@ -1336,6 +1332,10 @@ async def test_database_lock_and_unlock( await async_setup_recorder_instance(hass, config) await hass.async_block_till_done() + def _get_db_events(): + with session_scope(hass=hass) as session: + return list(session.query(Events).filter_by(event_type=event_type)) + instance: Recorder = hass.data[DATA_INSTANCE] assert await instance.lock_database() @@ -1350,22 +1350,17 @@ async def test_database_lock_and_unlock( # Recording can't be finished while lock is held with pytest.raises(asyncio.TimeoutError): await asyncio.wait_for(asyncio.shield(task), timeout=1) - - with session_scope(hass=hass) as session: - db_events = list(session.query(Events).filter_by(event_type=event_type)) + db_events = await hass.async_add_executor_job(_get_db_events) assert len(db_events) == 0 assert instance.unlock_database() await task - with session_scope(hass=hass) as session: - db_events = list(session.query(Events).filter_by(event_type=event_type)) - assert len(db_events) == 1 + db_events = await hass.async_add_executor_job(_get_db_events) + assert len(db_events) == 1 -@patch("homeassistant.components.recorder.pool._raise_if_main_thread") async def test_database_lock_and_overflow( - mock_main_thread, hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, tmp_path, @@ -1379,6 +1374,10 @@ async def test_database_lock_and_overflow( await async_setup_recorder_instance(hass, config) await hass.async_block_till_done() + def _get_db_events(): + with session_scope(hass=hass) as session: + return list(session.query(Events).filter_by(event_type=event_type)) + instance: Recorder = hass.data[DATA_INSTANCE] with patch.object(recorder.core, "MAX_QUEUE_BACKLOG", 1), patch.object( @@ -1394,9 +1393,8 @@ async def test_database_lock_and_overflow( # even before unlocking. await async_wait_recording_done(hass) - with session_scope(hass=hass) as session: - db_events = list(session.query(Events).filter_by(event_type=event_type)) - assert len(db_events) == 1 + db_events = await hass.async_add_executor_job(_get_db_events) + assert len(db_events) == 1 assert not instance.unlock_database() diff --git a/tests/components/recorder/test_pool.py b/tests/components/recorder/test_pool.py index d39d916dc9828f..aa47ce5eb3ca65 100644 --- a/tests/components/recorder/test_pool.py +++ b/tests/components/recorder/test_pool.py @@ -9,6 +9,13 @@ from homeassistant.components.recorder.pool import RecorderPool +async def test_recorder_pool_called_from_event_loop(): + """Test we raise an exception when calling from the event loop.""" + engine = create_engine("sqlite://", poolclass=RecorderPool) + with pytest.raises(RuntimeError): + sessionmaker(bind=engine)().connection() + + def test_recorder_pool(caplog): """Test RecorderPool gives the same connection in the creating thread.""" @@ -29,9 +36,6 @@ def _get_connection_twice(): connections.append(session.connection().connection.connection) session.close() - with pytest.raises(RuntimeError): - _get_connection_twice() - caplog.clear() new_thread = threading.Thread(target=_get_connection_twice) new_thread.start() diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index 433cbdb452b7cd..6b1093ee038446 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -597,9 +597,9 @@ def test_periodic_db_cleanups(hass_recorder): assert str(text_obj) == "PRAGMA wal_checkpoint(TRUNCATE);" -@patch("homeassistant.components.recorder.pool._raise_if_main_thread") +@patch("homeassistant.components.recorder.pool.check_loop") async def test_write_lock_db( - mock_main_thread, + skip_check_loop, hass: HomeAssistant, async_setup_recorder_instance: SetupRecorderInstanceT, tmp_path, diff --git a/tests/conftest.py b/tests/conftest.py index 838e962e4bd1de..37bdb05faf7ef0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -692,9 +692,9 @@ async def _async_init_recorder_component(hass, add_config=None): if recorder.CONF_COMMIT_INTERVAL not in config: config[recorder.CONF_COMMIT_INTERVAL] = 0 - with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True,), patch( - "homeassistant.components.recorder.pool._raise_if_main_thread", - ), patch("homeassistant.components.recorder.migration.migrate_schema"): + with patch("homeassistant.components.recorder.ALLOW_IN_MEMORY_DB", True), patch( + "homeassistant.components.recorder.migration.migrate_schema" + ): assert await async_setup_component( hass, recorder.DOMAIN, {recorder.DOMAIN: config} ) From 4e6da3a03991d4e17ef2192e7d1189f6214bff09 Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 9 May 2022 09:10:18 -0500 Subject: [PATCH 3/3] its the same message for executor as well --- homeassistant/components/recorder/pool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/homeassistant/components/recorder/pool.py b/homeassistant/components/recorder/pool.py index e7a3dc9e3f212e..f2f952121af0d9 100644 --- a/homeassistant/components/recorder/pool.py +++ b/homeassistant/components/recorder/pool.py @@ -20,7 +20,7 @@ POOL_SIZE = 5 -EVENT_LOOP_ADVISE_MSG = ( +ADVISE_MSG = ( "Use homeassistant.components.recorder.get_instance(hass).async_add_executor_job()" ) @@ -70,14 +70,14 @@ def _do_get(self) -> Any: check_loop( self._do_get_db_connection_protected, strict=True, - advise_msg=EVENT_LOOP_ADVISE_MSG, + advise_msg=ADVISE_MSG, ) return self._do_get_db_connection_protected() def _do_get_db_connection_protected(self) -> Any: report( "accesses the database without the database executor; " - f"{EVENT_LOOP_ADVISE_MSG} " + f"{ADVISE_MSG} " "for faster database operations", exclude_integrations={"recorder"}, error_if_core=False,