Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions homeassistant/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@
from homeassistant import config as conf_util, config_entries, core, loader
from homeassistant.components import http
from homeassistant.const import (
EVENT_HOMEASSISTANT_CLOSE,
EVENT_HOMEASSISTANT_STOP,
REQUIRED_NEXT_PYTHON_DATE,
REQUIRED_NEXT_PYTHON_VER,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.typing import ConfigType
from homeassistant.setup import DATA_SETUP, async_setup_component
from homeassistant.util.logging import AsyncHandler
from homeassistant.util.logging import async_activate_log_queue_handler
from homeassistant.util.package import async_get_user_site, is_virtual_env
from homeassistant.util.yaml import clear_secret_cache

Expand Down Expand Up @@ -278,24 +277,17 @@ def async_enable_logging(
err_handler.setLevel(logging.INFO if verbose else logging.WARNING)
err_handler.setFormatter(logging.Formatter(fmt, datefmt=datefmt))

async_handler = AsyncHandler(hass.loop, err_handler)

async def async_stop_async_handler(_: Any) -> None:
"""Cleanup async handler."""
logging.getLogger("").removeHandler(async_handler) # type: ignore
await async_handler.async_close(blocking=True)

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, async_stop_async_handler)

logger = logging.getLogger("")
logger.addHandler(async_handler) # type: ignore
logger.addHandler(err_handler)
logger.setLevel(logging.INFO)

# Save the log file location for access by other components.
hass.data[DATA_LOGGING] = err_log_path
else:
_LOGGER.error("Unable to set up error log %s (access denied)", err_log_path)

async_activate_log_queue_handler(hass)


async def async_mount_local_lib_path(config_dir: str) -> str:
"""Add local library to Python Path.
Expand Down
150 changes: 50 additions & 100 deletions homeassistant/util/logging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Logging utilities."""
import asyncio
from asyncio.events import AbstractEventLoop
from functools import partial, wraps
import inspect
import logging
import threading
import logging.handlers
import queue
import traceback
from typing import Any, Callable, Coroutine, Optional
from typing import Any, Callable, Coroutine

from homeassistant.const import EVENT_HOMEASSISTANT_CLOSE
from homeassistant.core import HomeAssistant, callback


class HideSensitiveDataFilter(logging.Filter):
Expand All @@ -24,104 +27,51 @@ def filter(self, record: logging.LogRecord) -> bool:
return True


class AsyncHandler:
"""Logging handler wrapper to add an async layer."""

def __init__(self, loop: AbstractEventLoop, handler: logging.Handler) -> None:
"""Initialize async logging handler wrapper."""
self.handler = handler
self.loop = loop
self._queue: asyncio.Queue = asyncio.Queue(loop=loop)
self._thread = threading.Thread(target=self._process)

# Delegate from handler
# pylint: disable=invalid-name
self.setLevel = handler.setLevel
self.setFormatter = handler.setFormatter
self.addFilter = handler.addFilter
self.removeFilter = handler.removeFilter
self.filter = handler.filter
self.flush = handler.flush
self.handle = handler.handle
self.handleError = handler.handleError
self.format = handler.format

self._thread.start()

def close(self) -> None:
"""Wrap close to handler."""
self.emit(None)

async def async_close(self, blocking: bool = False) -> None:
"""Close the handler.

When blocking=True, will wait till closed.
"""
await self._queue.put(None)

if blocking:
while self._thread.is_alive():
await asyncio.sleep(0)

def emit(self, record: Optional[logging.LogRecord]) -> None:
"""Process a record."""
ident = self.loop.__dict__.get("_thread_ident")

# inside eventloop
if ident is not None and ident == threading.get_ident():
self._queue.put_nowait(record)
# from a thread/executor
else:
self.loop.call_soon_threadsafe(self._queue.put_nowait, record)

def __repr__(self) -> str:
"""Return the string names."""
return str(self.handler)

def _process(self) -> None:
"""Process log in a thread."""
class HomeAssistantQueueHandler(logging.handlers.QueueHandler):
"""Process the log in another thread."""

def emit(self, record: logging.LogRecord) -> None:
"""Emit a log record."""
try:
while True:
record = asyncio.run_coroutine_threadsafe(
self._queue.get(), self.loop
).result()

if record is None:
self.handler.close()
return

self.handler.emit(record)
except asyncio.CancelledError:
self.handler.close()

def createLock(self) -> None: # pylint: disable=invalid-name
"""Ignore lock stuff."""

def acquire(self) -> None:
"""Ignore lock stuff."""

def release(self) -> None:
"""Ignore lock stuff."""

@property
def level(self) -> int:
"""Wrap property level to handler."""
return self.handler.level

@property
def formatter(self) -> Optional[logging.Formatter]:
"""Wrap property formatter to handler."""
return self.handler.formatter

@property
def name(self) -> str:
"""Wrap property set_name to handler."""
return self.handler.get_name() # type: ignore

@name.setter
def name(self, name: str) -> None:
"""Wrap property get_name to handler."""
self.handler.set_name(name) # type: ignore
self.enqueue(record)
except asyncio.CancelledError: # pylint: disable=try-except-raise
raise
except Exception: # pylint: disable=broad-except
self.handleError(record)


@callback
def async_activate_log_queue_handler(hass: HomeAssistant) -> None:
"""
Migrate the existing log handlers to use the queue.

This allows us to avoid blocking I/O and formatting messages
in the event loop as log messages are written in another thread.
"""
simple_queue = queue.SimpleQueue() # type: ignore
queue_handler = HomeAssistantQueueHandler(simple_queue)
logging.root.addHandler(queue_handler)

migrated_handlers = []
for handler in logging.root.handlers[:]:
if handler is queue_handler:
continue
logging.root.removeHandler(handler)
migrated_handlers.append(handler)

listener = logging.handlers.QueueListener(
simple_queue, *migrated_handlers, respect_handler_level=False
)

listener.start()

@callback
def _async_stop_queue_handler(_: Any) -> None:
"""Cleanup handler."""
logging.root.removeHandler(queue_handler)
listener.stop()

hass.bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, _async_stop_queue_handler)


def log_exception(format_err: Callable[..., Any], *args: Any) -> None:
Expand Down
9 changes: 9 additions & 0 deletions tests/test_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ async def test_home_assistant_core_config_validation(hass):
assert result is None


async def test_async_enable_logging(hass):
"""Test to ensure logging is migrated to the queue handlers."""
with patch("logging.getLogger"), patch(
"homeassistant.bootstrap.async_activate_log_queue_handler"
) as mock_async_activate_log_queue_handler:
bootstrap.async_enable_logging(hass)
mock_async_activate_log_queue_handler.assert_called_once()


async def test_load_hassio(hass):
"""Test that we load Hass.io component."""
with patch.dict(os.environ, {}, clear=True):
Expand Down
60 changes: 26 additions & 34 deletions tests/util/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Test Home Assistant logging util methods."""
import asyncio
import logging
import threading
import queue

import pytest

import homeassistant.util.logging as logging_util

from tests.async_mock import patch


def test_sensitive_data_filter():
"""Test the logging sensitive data filter."""
Expand All @@ -21,50 +23,40 @@ def test_sensitive_data_filter():
assert sensitive_record.msg == "******* log"


async def test_async_handler_loop_log(loop):
"""Test logging data inside from inside the event loop."""
loop._thread_ident = threading.get_ident()

queue = asyncio.Queue(loop=loop)
base_handler = logging.handlers.QueueHandler(queue)
handler = logging_util.AsyncHandler(loop, base_handler)
async def test_logging_with_queue_handler():
"""Test logging with HomeAssistantQueueHandler."""

# Test passthrough props and noop functions
assert handler.createLock() is None
assert handler.acquire() is None
assert handler.release() is None
assert handler.formatter is base_handler.formatter
assert handler.name is base_handler.get_name()
handler.name = "mock_name"
assert base_handler.get_name() == "mock_name"
simple_queue = queue.SimpleQueue() # type: ignore
handler = logging_util.HomeAssistantQueueHandler(simple_queue)

log_record = logging.makeLogRecord({"msg": "Test Log Record"})

handler.emit(log_record)
await handler.async_close(True)
assert queue.get_nowait().msg == "Test Log Record"
assert queue.empty()

with pytest.raises(asyncio.CancelledError), patch.object(
handler, "enqueue", side_effect=asyncio.CancelledError
):
handler.emit(log_record)

with patch.object(handler, "enqueue", side_effect=OSError), patch.object(
handler, "handleError"
) as mock_handle_error:
handler.emit(log_record)
mock_handle_error.assert_called_once()

async def test_async_handler_thread_log(loop):
"""Test logging data from a thread."""
loop._thread_ident = threading.get_ident()
handler.close()

queue = asyncio.Queue(loop=loop)
base_handler = logging.handlers.QueueHandler(queue)
handler = logging_util.AsyncHandler(loop, base_handler)
assert simple_queue.get_nowait().msg == "Test Log Record"
assert simple_queue.empty()

log_record = logging.makeLogRecord({"msg": "Test Log Record"})

def add_log():
"""Emit a mock log."""
handler.emit(log_record)
handler.close()
async def test_migrate_log_handler(hass):
"""Test migrating log handlers."""

await loop.run_in_executor(None, add_log)
await handler.async_close(True)
logging_util.async_activate_log_queue_handler(hass)

assert queue.get_nowait().msg == "Test Log Record"
assert queue.empty()
assert len(logging.root.handlers) == 1
assert isinstance(logging.root.handlers[0], logging_util.HomeAssistantQueueHandler)


@pytest.mark.no_fail_on_log_exception
Expand Down