Skip to content

Commit

Permalink
by default only set logging handler if no other handler has been set …
Browse files Browse the repository at this point in the history
…to avoid double logging (#7750)
  • Loading branch information
graingert authored Apr 26, 2023
1 parent 7477a63 commit 332bf01
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 81 deletions.
55 changes: 32 additions & 23 deletions distributed/config.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
from __future__ import annotations

import asyncio
import logging
import logging.config
import os
import sys
from typing import Any

import yaml

import dask
from dask.utils import import_required

from distributed.compatibility import WINDOWS, logging_names

config = dask.config.config


Expand Down Expand Up @@ -68,7 +66,15 @@
logger = logging.getLogger(__name__)


def _initialize_logging_old_style(config):
if sys.version_info >= (3, 11):
_logging_get_level_names_mapping = logging.getLevelNamesMapping
else:

def _logging_get_level_names_mapping() -> dict[str, int]:
return logging._nameToLevel.copy()


def _initialize_logging_old_style(config: dict[Any, Any]) -> None:
"""
Initialize logging using the "old-style" configuration scheme, e.g.:
{
Expand All @@ -79,7 +85,7 @@ def _initialize_logging_old_style(config):
}
}
"""
loggers = { # default values
loggers: dict[str, str | int] = { # default values
"distributed": "info",
"distributed.client": "warning",
"bokeh": "error",
Expand All @@ -95,50 +101,53 @@ def _initialize_logging_old_style(config):
dask.config.get("distributed.admin.log-format", config=config)
)
)
for name, level in loggers.items():
if isinstance(level, str):
level = logging_names[level.upper()]
logging_names = _logging_get_level_names_mapping()
for name, raw_level in sorted(loggers.items()):
level = (
logging_names[raw_level.upper()]
if isinstance(raw_level, str)
else raw_level
)
logger = logging.getLogger(name)
logger.setLevel(level)

# Ensure that we're not registering the logger twice in this hierarchy.
anc = None
anc = logging.getLogger(None)
already_registered = False
for ancestor in name.split("."):
if anc is None:
anc = logging.getLogger(ancestor)
else:
anc.getChild(ancestor)

if handler in anc.handlers:
for ancestor in name.split("."):
if anc.handlers:
already_registered = True
break
anc.getChild(ancestor)

if not already_registered:
logger.addHandler(handler)
logger.propagate = False


def _initialize_logging_new_style(config):
def _initialize_logging_new_style(config: dict[Any, Any]) -> None:
"""
Initialize logging using logging's "Configuration dictionary schema".
(ref.: https://docs.python.org/3/library/logging.config.html#configuration-dictionary-schema)
"""
base_config = _find_logging_config(config)
logging.config.dictConfig(base_config.get("logging"))
logging.config.dictConfig(base_config.get("logging")) # type: ignore[arg-type]


def _initialize_logging_file_config(config):
def _initialize_logging_file_config(config: dict[Any, Any]) -> None:
"""
Initialize logging using logging's "Configuration file format".
(ref.: https://docs.python.org/3/howto/logging.html#configuring-logging)
"""
base_config = _find_logging_config(config)
logging.config.fileConfig(
base_config.get("logging-file-config"), disable_existing_loggers=False
base_config.get("logging-file-config"), # type: ignore[arg-type]
disable_existing_loggers=False,
)


def _find_logging_config(config):
def _find_logging_config(config: dict[Any, Any]) -> dict[Any, Any]:
"""
Look for the dictionary containing logging-specific configurations,
starting in the 'distributed' dictionary and then trying the top-level
Expand All @@ -150,7 +159,7 @@ def _find_logging_config(config):
return config


def initialize_logging(config):
def initialize_logging(config: dict[Any, Any]) -> None:
base_config = _find_logging_config(config)
if "logging-file-config" in base_config:
if "logging" in base_config:
Expand All @@ -168,7 +177,7 @@ def initialize_logging(config):
_initialize_logging_old_style(config)


def initialize_event_loop(config):
def initialize_event_loop(config: dict[Any, Any]) -> None:
event_loop = dask.config.get("distributed.admin.event-loop")
if event_loop == "uvloop":
uvloop = import_required(
Expand All @@ -182,7 +191,7 @@ def initialize_event_loop(config):
)
uvloop.install()
elif event_loop in {"asyncio", "tornado"}:
if WINDOWS:
if sys.platform == "win32":
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
# https://github.com/tornadoweb/tornado/issues/2608
Expand Down
21 changes: 11 additions & 10 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import atexit
import contextlib
import copy
import logging
import math
Expand All @@ -23,7 +24,12 @@
from distributed.deploy.cluster import Cluster
from distributed.scheduler import Scheduler
from distributed.security import Security
from distributed.utils import NoOpAwaitable, TimeoutError, import_term, silence_logging
from distributed.utils import (
NoOpAwaitable,
TimeoutError,
import_term,
silence_logging_cmgr,
)

if TYPE_CHECKING:
# TODO import from typing (requires Python >=3.11)
Expand Down Expand Up @@ -245,6 +251,7 @@ def __init__(
if loop is None and asynchronous:
loop = IOLoop.current()

self.__exit_stack = stack = contextlib.ExitStack()
self._created = weakref.WeakSet()

self.scheduler_spec = copy.copy(scheduler)
Expand All @@ -257,10 +264,8 @@ def __init__(
self._futures = set()

if silence_logs:
self._old_logging_level = silence_logging(level=silence_logs)
self._old_bokeh_logging_level = silence_logging(
level=silence_logs, root="bokeh"
)
stack.enter_context(silence_logging_cmgr(level=silence_logs))
stack.enter_context(silence_logging_cmgr(level=silence_logs, root="bokeh"))

self._instances.add(self)
self._correct_state_waiting = None
Expand Down Expand Up @@ -458,11 +463,7 @@ async def _close(self):
Status.failed,
}, w.status

if hasattr(self, "_old_logging_level"):
silence_logging(self._old_logging_level)
if hasattr(self, "_old_bokeh_logging_level"):
silence_logging(self._old_bokeh_logging_level, root="bokeh")

self.__exit_stack.__exit__(None, None, None)
await super()._close()

async def __aenter__(self):
Expand Down
6 changes: 4 additions & 2 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
json_load_robust,
log_errors,
parse_ports,
silence_logging,
silence_logging_cmgr,
wait_for,
)
from distributed.worker import Worker, run
Expand Down Expand Up @@ -164,6 +164,7 @@ def __init__( # type: ignore[no-untyped-def]
stacklevel=2,
)

self.__exit_stack = stack = contextlib.ExitStack()
self.process = None
self._setup_logging(logger)
self.loop = self.io_loop = IOLoop.current()
Expand Down Expand Up @@ -252,7 +253,7 @@ def __init__( # type: ignore[no-untyped-def]
self.quiet = quiet

if silence_logs:
silence_logging(level=silence_logs)
stack.enter_context(silence_logging_cmgr(level=silence_logs))
self.silence_logs = silence_logs

handlers = {
Expand Down Expand Up @@ -611,6 +612,7 @@ async def close(
await self.rpc.close()
self.status = Status.closed
await super().close()
self.__exit_stack.__exit__(None, None, None)
return "OK"

async def _log_event(self, topic, msg):
Expand Down
10 changes: 6 additions & 4 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2245,14 +2245,16 @@ async def test_cancel_multi_client(s, a, b):


@gen_cluster(nthreads=[("", 1)], client=True)
async def test_cancel_before_known_to_scheduler(c, s, a, caplog):
async def test_cancel_before_known_to_scheduler(c, s, a):
f = c.submit(inc, 1)
f2 = c.submit(inc, f)
await c.cancel([f])
assert f.cancelled()

with pytest.raises(CancelledError):
await f
while f"Scheduler cancels key {f.key}" not in caplog.text:
await asyncio.sleep(0.05)
await f2

assert any(f"Scheduler cancels key {f.key}" in msg for _, msg in s.get_logs())


@gen_cluster(client=True)
Expand Down
75 changes: 59 additions & 16 deletions distributed/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import contextlib
import logging
import os
import re
import subprocess
import sys
import tempfile
import textwrap

import pytest
import yaml

from distributed.config import initialize_logging
from distributed.utils_test import new_config, new_config_file
from distributed.utils_test import captured_handler, new_config, new_config_file


@pytest.mark.parametrize(
Expand All @@ -38,26 +40,31 @@ def test_logging_default(caplog, config):
f = logging.getLogger("foo")
fb = logging.getLogger("foo.bar")

d.debug("1: debug")
d.info("2: info")
dfb.info("3: info")
fb.info("4: info")
fb.error("5: error")
f.info("6: info")
f.error("7: error")
dfc.info("8: ignore me")
dfc.warning("9: important")

expected_logs = [
("distributed", logging.INFO, "2: info"),
("distributed.foo.bar", logging.INFO, "3: info"),
with captured_handler(d.handlers[0]) as distributed_log:
d.debug("1: debug")
d.info("2: info")
dfb.info("3: info")
fb.info("4: info")
fb.error("5: error")
f.info("6: info")
f.error("7: error")
dfc.info("8: ignore me")
dfc.warning("9: important")

# default logging sets propagate=False so caplog does not capture
# distributed log records
assert caplog.record_tuples == [
# Info logs of foreign libraries are not logged because default is
# WARNING
("foo.bar", logging.ERROR, "5: error"),
("foo", logging.ERROR, "7: error"),
("distributed.client", logging.WARN, "9: important"),
]
assert expected_logs == caplog.record_tuples
assert re.match(
r"\A\d+-\d+-\d+ \d+:\d+:\d+,\d+ - distributed - INFO - 2: info\n"
r"\d+-\d+-\d+ \d+:\d+:\d+,\d+ - distributed.foo.bar - INFO - 3: info\n"
r"\d+-\d+-\d+ \d+:\d+:\d+,\d+ - distributed.client - WARNING - 9: important\n\Z",
distributed_log.getvalue(),
)


def test_logging_simple_under_distributed():
Expand Down Expand Up @@ -190,6 +197,42 @@ def test_logging_extended():
subprocess.check_call([sys.executable, "-c", code])


def test_default_logging_does_not_override_basic_config():
code = textwrap.dedent(
"""\
import logging
logging.basicConfig()
import distributed
logging.getLogger("distributed").warning("hello")
"""
)
proc = subprocess.run(
[sys.executable, "-c", code], check=True, capture_output=True, encoding="utf8"
)
assert proc.stdout == ""
assert proc.stderr == "WARNING:distributed:hello\n"


def test_basic_config_does_not_override_default_logging():
code = textwrap.dedent(
"""\
import logging
import distributed
logging.basicConfig()
logging.getLogger("distributed").warning("hello")
"""
)
proc = subprocess.run(
[sys.executable, "-c", code], check=True, capture_output=True, encoding="utf8"
)
assert proc.stdout == ""
assert re.match(
r"\A\d+-\d+-\d+ \d+:\d+:\d+,\d+ - distributed - WARNING - hello\n\Z",
proc.stderr,
)


def test_logging_mutual_exclusive():
"""
Ensure that 'logging-file-config' and 'logging' have to be mutual exclusive.
Expand Down
16 changes: 10 additions & 6 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
BrokenComm,
assert_story,
async_poll_for,
captured_handler,
captured_logger,
cluster,
dec,
Expand Down Expand Up @@ -2728,17 +2729,20 @@ async def connect(self, *args, **kwargs):


@gen_cluster(client=True)
async def test_gather_failing_cnn_recover(c, s, a, b, caplog):
async def test_gather_failing_cnn_recover(c, s, a, b):
x = await c.scatter({"x": 1}, workers=a.address)
rpc = await FlakyConnectionPool(failing_connections=1)
with mock.patch.object(s, "rpc", rpc), dask.config.set(
{"distributed.comm.retry.count": 1}
), caplog.at_level(logging.INFO):
caplog.clear()
), captured_handler(
logging.getLogger("distributed").handlers[0]
) as distributed_log:
res = await s.gather(keys=["x"])
assert [
record.message for record in caplog.records if record.levelno == logging.INFO
] == ["Retrying get_data_from_worker after exception in attempt 0/1: "]
assert re.match(
r"\A\d+-\d+-\d+ \d+:\d+:\d+,\d+ - distributed.utils_comm - INFO - "
r"Retrying get_data_from_worker after exception in attempt 0/1: \n\Z",
distributed_log.getvalue(),
)
assert res["status"] == "OK"


Expand Down
Loading

0 comments on commit 332bf01

Please sign in to comment.