Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions homeassistant/components/logger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Support for setting the level of logging for components."""
from collections import OrderedDict
import logging
import re

import voluptuous as vol

Expand Down Expand Up @@ -50,33 +50,54 @@
class HomeAssistantLogFilter(logging.Filter):
"""A log filter."""

def __init__(self, logfilter):
def __init__(self):
"""Initialize the filter."""
super().__init__()

self.logfilter = logfilter
self._default = None
self._logs = None
self._log_rx = None

def update_default_level(self, default_level):
"""Update the default logger level."""
self._default = default_level

def update_log_filter(self, logs):
"""Rebuild the internal filter from new config."""
#
# A precompiled regex is used to avoid
# the overhead of a list transversal
#
# Sort to make sure the longer
# names are always matched first
# so they take precedence of the shorter names
# to allow for more granular settings.
#
names_by_len = sorted(list(logs), key=len, reverse=True)
self._log_rx = re.compile("".join(["^(?:", "|".join(names_by_len), ")"]))
self._logs = logs

def filter(self, record):
"""Filter the log entries."""
# Log with filtered severity
if LOGGER_LOGS in self.logfilter:
for filtername in self.logfilter[LOGGER_LOGS]:
logseverity = self.logfilter[LOGGER_LOGS][filtername]
if record.name.startswith(filtername):
return record.levelno >= logseverity
if self._log_rx:
match = self._log_rx.match(record.name)
if match:
return record.levelno >= self._logs[match.group(0)]

# Log with default severity
default = self.logfilter[LOGGER_DEFAULT]
return record.levelno >= default
return record.levelno >= self._default


async def async_setup(hass, config):
"""Set up the logger component."""
logfilter = {}
hass_filter = HomeAssistantLogFilter()

def set_default_log_level(level):
"""Set the default log level for components."""
logfilter[LOGGER_DEFAULT] = LOGSEVERITY[level]
hass_filter.update_default_level(LOGSEVERITY[level])

def set_log_levels(logpoints):
"""Set the specified log levels."""
Expand All @@ -90,9 +111,9 @@ def set_log_levels(logpoints):
for key, value in logpoints.items():
logs[key] = LOGSEVERITY[value]

logfilter[LOGGER_LOGS] = OrderedDict(
sorted(logs.items(), key=lambda t: len(t[0]), reverse=True)
)
logfilter[LOGGER_LOGS] = logs

hass_filter.update_log_filter(logs)

# Set default log severity
if LOGGER_DEFAULT in config.get(DOMAIN):
Expand All @@ -106,7 +127,7 @@ def set_log_levels(logpoints):
# Set log filter for all log handler
for handler in logging.root.handlers:
handler.setLevel(logging.NOTSET)
handler.addFilter(HomeAssistantLogFilter(logfilter))
handler.addFilter(hass_filter)

if LOGGER_LOGS in config.get(DOMAIN):
set_log_levels(config.get(DOMAIN)[LOGGER_LOGS])
Expand Down
159 changes: 78 additions & 81 deletions tests/components/logger/test_init.py
Original file line number Diff line number Diff line change
@@ -1,123 +1,120 @@
"""The tests for the Logger component."""
from collections import namedtuple
import logging
import unittest

from homeassistant.components import logger
from homeassistant.setup import setup_component

from tests.common import get_test_home_assistant
from homeassistant.setup import async_setup_component

RECORD = namedtuple("record", ("name", "levelno"))

NO_DEFAULT_CONFIG = {"logger": {}}
NO_LOGS_CONFIG = {"logger": {"default": "info"}}
TEST_CONFIG = {"logger": {"default": "warning", "logs": {"test": "info"}}}
TEST_CONFIG = {
"logger": {
"default": "warning",
"logs": {"test": "info", "test.child": "debug", "test.child.child": "warning"},
}
}


async def async_setup_logger(hass, config):
"""Set up logger and save log filter."""
await async_setup_component(hass, logger.DOMAIN, config)
return logging.root.handlers[-1].filters[0]


async def test_logger_setup(hass):
"""Use logger to create a logging filter."""
await async_setup_logger(hass, TEST_CONFIG)

assert len(logging.root.handlers) > 0
handler = logging.root.handlers[-1]

class TestUpdater(unittest.TestCase):
"""Test logger component."""
assert len(handler.filters) == 1

def setUp(self):
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.log_filter = None

def tearDown(self):
"""Stop everything that was started."""
del logging.root.handlers[-1]
self.hass.stop()
async def test_logger_test_filters(hass):
"""Test resulting filter operation."""
log_filter = await async_setup_logger(hass, TEST_CONFIG)

def setup_logger(self, config):
"""Set up logger and save log filter."""
setup_component(self.hass, logger.DOMAIN, config)
self.log_filter = logging.root.handlers[-1].filters[0]
# Blocked default record
assert not log_filter.filter(RECORD("asdf", logging.DEBUG))

def assert_logged(self, name, level):
"""Assert that a certain record was logged."""
assert self.log_filter.filter(RECORD(name, level))
# Allowed default record
assert log_filter.filter(RECORD("asdf", logging.WARNING))

def assert_not_logged(self, name, level):
"""Assert that a certain record was not logged."""
assert not self.log_filter.filter(RECORD(name, level))
# Blocked named record
assert not log_filter.filter(RECORD("test", logging.DEBUG))

def test_logger_setup(self):
"""Use logger to create a logging filter."""
self.setup_logger(TEST_CONFIG)
# Allowed named record
assert log_filter.filter(RECORD("test", logging.INFO))

assert len(logging.root.handlers) > 0
handler = logging.root.handlers[-1]
# Allowed named record child
assert log_filter.filter(RECORD("test.child", logging.INFO))

assert len(handler.filters) == 1
log_filter = handler.filters[0].logfilter
# Allowed named record child
assert log_filter.filter(RECORD("test.child", logging.DEBUG))

assert log_filter["default"] == logging.WARNING
assert log_filter["logs"]["test"] == logging.INFO
# Blocked named record child of child
assert not log_filter.filter(RECORD("test.child.child", logging.DEBUG))

def test_logger_test_filters(self):
"""Test resulting filter operation."""
self.setup_logger(TEST_CONFIG)
# Allowed named record child of child
assert log_filter.filter(RECORD("test.child.child", logging.WARNING))

# Blocked default record
self.assert_not_logged("asdf", logging.DEBUG)

# Allowed default record
self.assert_logged("asdf", logging.WARNING)
async def test_set_filter_empty_config(hass):
"""Test change log level from empty configuration."""
log_filter = await async_setup_logger(hass, NO_LOGS_CONFIG)

# Blocked named record
self.assert_not_logged("test", logging.DEBUG)
assert not log_filter.filter(RECORD("test", logging.DEBUG))

# Allowed named record
self.assert_logged("test", logging.INFO)
await hass.services.async_call(logger.DOMAIN, "set_level", {"test": "debug"})
await hass.async_block_till_done()

def test_set_filter_empty_config(self):
"""Test change log level from empty configuration."""
self.setup_logger(NO_LOGS_CONFIG)
assert log_filter.filter(RECORD("test", logging.DEBUG))

self.assert_not_logged("test", logging.DEBUG)

self.hass.services.call(logger.DOMAIN, "set_level", {"test": "debug"})
self.hass.block_till_done()
async def test_set_filter(hass):
"""Test change log level of existing filter."""
log_filter = await async_setup_logger(hass, TEST_CONFIG)

self.assert_logged("test", logging.DEBUG)
assert not log_filter.filter(RECORD("asdf", logging.DEBUG))
assert log_filter.filter(RECORD("dummy", logging.WARNING))

def test_set_filter(self):
"""Test change log level of existing filter."""
self.setup_logger(TEST_CONFIG)
await hass.services.async_call(
logger.DOMAIN, "set_level", {"asdf": "debug", "dummy": "info"}
)
await hass.async_block_till_done()

self.assert_not_logged("asdf", logging.DEBUG)
self.assert_logged("dummy", logging.WARNING)
assert log_filter.filter(RECORD("asdf", logging.DEBUG))
assert log_filter.filter(RECORD("dummy", logging.WARNING))

self.hass.services.call(
logger.DOMAIN, "set_level", {"asdf": "debug", "dummy": "info"}
)
self.hass.block_till_done()

self.assert_logged("asdf", logging.DEBUG)
self.assert_logged("dummy", logging.WARNING)
async def test_set_default_filter_empty_config(hass):
"""Test change default log level from empty configuration."""
log_filter = await async_setup_logger(hass, NO_DEFAULT_CONFIG)

def test_set_default_filter_empty_config(self):
"""Test change default log level from empty configuration."""
self.setup_logger(NO_DEFAULT_CONFIG)
assert log_filter.filter(RECORD("test", logging.DEBUG))

self.assert_logged("test", logging.DEBUG)
await hass.services.async_call(
logger.DOMAIN, "set_default_level", {"level": "warning"}
)
await hass.async_block_till_done()

self.hass.services.call(
logger.DOMAIN, "set_default_level", {"level": "warning"}
)
self.hass.block_till_done()
assert not log_filter.filter(RECORD("test", logging.DEBUG))

self.assert_not_logged("test", logging.DEBUG)

def test_set_default_filter(self):
"""Test change default log level with existing default."""
self.setup_logger(TEST_CONFIG)
async def test_set_default_filter(hass):
"""Test change default log level with existing default."""
log_filter = await async_setup_logger(hass, TEST_CONFIG)

self.assert_not_logged("asdf", logging.DEBUG)
self.assert_logged("dummy", logging.WARNING)
assert not log_filter.filter(RECORD("asdf", logging.DEBUG))
assert log_filter.filter(RECORD("dummy", logging.WARNING))

self.hass.services.call(logger.DOMAIN, "set_default_level", {"level": "debug"})
self.hass.block_till_done()
await hass.services.async_call(
logger.DOMAIN, "set_default_level", {"level": "debug"}
)
await hass.async_block_till_done()

self.assert_logged("asdf", logging.DEBUG)
self.assert_logged("dummy", logging.WARNING)
assert log_filter.filter(RECORD("asdf", logging.DEBUG))
assert log_filter.filter(RECORD("dummy", logging.WARNING))