Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor EventManager setup and interaction #9180

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/dbt/adapters/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
)
from dbt.common.events.functions import fire_event, fire_event_if
from dbt.adapters.events.types import CacheAction, CacheDumpGraph
from dbt.utils import lowercase
from dbt.common.utils.formatting import lowercase


def dot_separated(key: _ReferenceKey) -> str:
Expand Down
5 changes: 3 additions & 2 deletions core/dbt/adapters/events/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
AdapterEventWarning,
AdapterEventError,
)
from dbt.common.events import get_event_manager
from dbt.common.events.contextvars import get_node_info
from dbt.common.events.event_handler import set_package_logging
from dbt.common.events.functions import fire_event, EVENT_MANAGER
from dbt.common.events.functions import fire_event


@dataclass
Expand Down Expand Up @@ -63,4 +64,4 @@ def set_adapter_dependency_log_level(package_name, level):
"""By default, dbt suppresses non-dbt package logs. This method allows
you to set the log level for a specific package.
"""
set_package_logging(package_name, level, EVENT_MANAGER)
set_package_logging(package_name, level, get_event_manager())
8 changes: 5 additions & 3 deletions core/dbt/cli/requires.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import dbt.tracking
from dbt.common.invocation import reset_invocation_id
from dbt.version import installed as installed_version
from dbt.adapters.factory import adapter_management, register_adapter
from dbt.flags import set_flags, get_flag_dict
Expand All @@ -14,9 +15,8 @@
from dbt.common.events.functions import (
fire_event,
LOG_VERSION,
set_invocation_id,
setup_event_logger,
)
from dbt.events.logging import setup_event_logger
from dbt.common.events.types import (
CommandCompleted,
MainReportVersion,
Expand Down Expand Up @@ -52,9 +52,11 @@ def wrapper(*args, **kwargs):
ctx.obj["flags"] = flags
set_flags(flags)

# Reset invocation_id for each 'invocation' of a dbt command (can happen multiple times in a single process)
reset_invocation_id()

# Logging
callbacks = ctx.obj.get("callbacks", [])
set_invocation_id()
setup_event_logger(flags=flags, callbacks=callbacks)

# Tracking
Expand Down
9 changes: 9 additions & 0 deletions core/dbt/common/events/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from dbt.common.events.base_types import EventLevel
from dbt.common.events.event_manager_client import get_event_manager
from dbt.common.events.functions import get_stdout_config
from dbt.common.events.logger import LineFormat

# make sure event manager starts with a logger
get_event_manager().add_logger(
get_stdout_config(LineFormat.PlainText, True, EventLevel.INFO, False)
)
8 changes: 2 additions & 6 deletions core/dbt/common/events/base_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from dbt.common.events.helpers import get_json_string_utcnow
from typing import Optional

from dbt.common.invocation import get_invocation_id

if sys.version_info >= (3, 8):
from typing import Protocol
else:
Expand All @@ -26,12 +28,6 @@ def get_global_metadata_vars() -> dict:
return get_metadata_vars()


def get_invocation_id() -> str:
from dbt.common.events.functions import get_invocation_id

return get_invocation_id()


# exactly one pid per concrete event
def get_pid() -> int:
return os.getpid()
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/common/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dbt.common.events.base_types import EventLevel
from dbt.common.events.types import Note
from dbt.common.events.eventmgr import IEventManager
from dbt.common.events.event_manager import IEventManager


_log_level_to_event_level_map = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import traceback
from typing import Callable, List, Optional, Protocol, Tuple
from uuid import uuid4

from dbt.common.events.base_types import BaseEvent, EventLevel, msg_from_base_event, EventMsg
from dbt.common.events.logger import LoggerConfig, _Logger, _TextLogger, _JsonLogger, LineFormat
Expand All @@ -11,7 +10,6 @@ class EventManager:
def __init__(self) -> None:
self.loggers: List[_Logger] = []
self.callbacks: List[Callable[[EventMsg], None]] = []
self.invocation_id: str = str(uuid4())

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
msg = msg_from_base_event(e, level=level)
Expand Down Expand Up @@ -45,7 +43,6 @@ def flush(self) -> None:

class IEventManager(Protocol):
callbacks: List[Callable[[EventMsg], None]]
invocation_id: str
loggers: List[_Logger]

def fire_event(self, e: BaseEvent, level: Optional[EventLevel] = None) -> None:
Expand Down
29 changes: 29 additions & 0 deletions core/dbt/common/events/event_manager_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Since dbt-rpc does not do its own log setup, and since some events can
# currently fire before logs can be configured by setup_event_logger(), we
# create a default configuration with default settings and no file output.
from dbt.common.events.event_manager import IEventManager, EventManager

_EVENT_MANAGER: IEventManager = EventManager()


def get_event_manager() -> IEventManager:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of using "accessor" functions for the globals we still have, which will give us some flexibility to get away from actually using globals.

We use this pattern elsewhere (e.g. with fire_event) and I've wondered if we should make the nature of the functions more obvious by naming them with a "ctx_" prefix to make it clear we're accessing execution context?

global _EVENT_MANAGER
return _EVENT_MANAGER


def add_logger_to_manager(logger) -> None:
global _EVENT_MANAGER
_EVENT_MANAGER.add_logger(logger)


def ctx_set_event_manager(event_manager: IEventManager) -> None:
global _EVENT_MANAGER
_EVENT_MANAGER = event_manager


def cleanup_event_logger() -> None:
# Reset to a no-op manager to release streams associated with logs. This is
# especially important for tests, since pytest replaces the stdout stream
# during test runs, and closes the stream after the test is over.
_EVENT_MANAGER.loggers.clear()
_EVENT_MANAGER.callbacks.clear()
172 changes: 29 additions & 143 deletions core/dbt/common/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,41 @@
from pathlib import Path

from dbt.common.events.event_manager_client import get_event_manager
from dbt.common.invocation import get_invocation_id
from dbt.common.helper_types import WarnErrorOptions
from dbt.common.utils import ForgivingJSONEncoder
from dbt.common.events.base_types import BaseEvent, EventLevel, EventMsg
from dbt.common.events.eventmgr import EventManager, IEventManager
from dbt.common.events.logger import LoggerConfig, LineFormat
from dbt.common.exceptions import scrub_secrets, env_secrets
from dbt.common.events.types import Note
from functools import partial
import json
import os
import sys
from typing import Callable, Dict, List, Optional, TextIO, Union
import uuid
from typing import Callable, Dict, Optional, TextIO, Union
from google.protobuf.json_format import MessageToDict

LOG_VERSION = 3
metadata_vars: Optional[Dict[str, str]] = None
_METADATA_ENV_PREFIX = "DBT_ENV_CUSTOM_ENV_"
# These are the logging events issued by the "clean" command,
# where we can't count on having a log directory. We've removed
# the "class" flags on the events in types.py. If necessary we
# could still use class or method flags, but we'd have to get
# the type class from the msg and then get the information from the class.
nofile_codes = ["Z012", "Z013", "Z014", "Z015"]
WARN_ERROR_OPTIONS = WarnErrorOptions(include=[], exclude=[])
WARN_ERROR = False

# This global, and the following two functions for capturing stdout logs are
# an unpleasant hack we intend to remove as part of API-ification. The GitHub
# issue #6350 was opened for that work.
CAPTURE_STREAM: Optional[TextIO] = None

def make_log_dir_if_missing(log_path: Union[Path, str]) -> None:
if isinstance(log_path, str):
log_path = Path(log_path)
log_path.mkdir(parents=True, exist_ok=True)


def setup_event_logger(flags, callbacks: List[Callable[[EventMsg], None]] = []) -> None:
cleanup_event_logger()
make_log_dir_if_missing(flags.LOG_PATH)
EVENT_MANAGER.callbacks = callbacks.copy()

if flags.LOG_LEVEL != "none":
line_format = _line_format_from_str(flags.LOG_FORMAT, LineFormat.PlainText)
log_level = (
EventLevel.ERROR
if flags.QUIET
else EventLevel.DEBUG
if flags.DEBUG
else EventLevel(flags.LOG_LEVEL)
)
console_config = _get_stdout_config(
line_format,
flags.USE_COLORS,
log_level,
flags.LOG_CACHE_EVENTS,
)
EVENT_MANAGER.add_logger(console_config)

if _CAPTURE_STREAM:
# Create second stdout logger to support test which want to know what's
# being sent to stdout.
console_config.output_stream = _CAPTURE_STREAM
EVENT_MANAGER.add_logger(console_config)

if flags.LOG_LEVEL_FILE != "none":
# create and add the file logger to the event manager
log_file = os.path.join(flags.LOG_PATH, "dbt.log")
log_file_format = _line_format_from_str(flags.LOG_FORMAT_FILE, LineFormat.DebugText)
log_level_file = EventLevel.DEBUG if flags.DEBUG else EventLevel(flags.LOG_LEVEL_FILE)
EVENT_MANAGER.add_logger(
_get_logfile_config(
log_file,
flags.USE_COLORS_FILE,
log_file_format,
log_level_file,
flags.LOG_FILE_MAX_BYTES,
)
)


def _line_format_from_str(format_str: str, default: LineFormat) -> LineFormat:
if format_str == "text":
return LineFormat.PlainText
elif format_str == "debug":
return LineFormat.DebugText
elif format_str == "json":
return LineFormat.Json

return default
def stdout_filter(
log_cache_events: bool,
line_format: LineFormat,
msg: EventMsg,
) -> bool:
return msg.info.name not in ["CacheAction", "CacheDumpGraph"] or log_cache_events


def _get_stdout_config(
def get_stdout_config(
line_format: LineFormat,
use_colors: bool,
level: EventLevel,
Expand All @@ -102,83 +48,38 @@ def _get_stdout_config(
line_format=line_format,
scrubber=env_scrubber,
filter=partial(
_stdout_filter,
stdout_filter,
log_cache_events,
line_format,
),
invocation_id=EVENT_MANAGER.invocation_id,
invocation_id=get_invocation_id(),
output_stream=sys.stdout,
)


def _stdout_filter(
log_cache_events: bool,
line_format: LineFormat,
msg: EventMsg,
) -> bool:
return msg.info.name not in ["CacheAction", "CacheDumpGraph"] or log_cache_events


def _get_logfile_config(
log_path: str,
use_colors: bool,
line_format: LineFormat,
level: EventLevel,
log_file_max_bytes: int,
log_cache_events: bool = False,
) -> LoggerConfig:
return LoggerConfig(
name="file_log",
line_format=line_format,
use_colors=use_colors,
level=level, # File log is *always* debug level
scrubber=env_scrubber,
filter=partial(_logfile_filter, log_cache_events, line_format),
invocation_id=EVENT_MANAGER.invocation_id,
output_file_name=log_path,
output_file_max_bytes=log_file_max_bytes,
)


def _logfile_filter(log_cache_events: bool, line_format: LineFormat, msg: EventMsg) -> bool:
return msg.info.code not in nofile_codes and not (
msg.info.name in ["CacheAction", "CacheDumpGraph"] and not log_cache_events
)
def make_log_dir_if_missing(log_path: Union[Path, str]) -> None:
if isinstance(log_path, str):
log_path = Path(log_path)
log_path.mkdir(parents=True, exist_ok=True)


def env_scrubber(msg: str) -> str:
return scrub_secrets(msg, env_secrets())


def cleanup_event_logger() -> None:
# Reset to a no-op manager to release streams associated with logs. This is
# especially important for tests, since pytest replaces the stdout stream
# during test runs, and closes the stream after the test is over.
EVENT_MANAGER.loggers.clear()
EVENT_MANAGER.callbacks.clear()


# Since dbt-rpc does not do its own log setup, and since some events can
# currently fire before logs can be configured by setup_event_logger(), we
# create a default configuration with default settings and no file output.
EVENT_MANAGER: IEventManager = EventManager()
EVENT_MANAGER.add_logger(_get_stdout_config(LineFormat.PlainText, True, EventLevel.INFO, False))

# This global, and the following two functions for capturing stdout logs are
# an unpleasant hack we intend to remove as part of API-ification. The GitHub
# issue #6350 was opened for that work.
_CAPTURE_STREAM: Optional[TextIO] = None


# used for integration tests
def capture_stdout_logs(stream: TextIO) -> None:
global _CAPTURE_STREAM
_CAPTURE_STREAM = stream
global CAPTURE_STREAM
CAPTURE_STREAM = stream


def stop_capture_stdout_logs() -> None:
global _CAPTURE_STREAM
_CAPTURE_STREAM = None
global CAPTURE_STREAM
CAPTURE_STREAM = None


def get_capture_stream() -> Optional[TextIO]:
return CAPTURE_STREAM


# returns a dictionary representation of the event fields.
Expand Down Expand Up @@ -242,7 +143,7 @@ def fire_event_if_test(
# (i.e. - mutating the event history, printing to stdout, logging
# to files, etc.)
def fire_event(e: BaseEvent, level: Optional[EventLevel] = None) -> None:
EVENT_MANAGER.fire_event(e, level=level)
get_event_manager().fire_event(e, level=level)


def get_metadata_vars() -> Dict[str, str]:
Expand All @@ -259,18 +160,3 @@ def get_metadata_vars() -> Dict[str, str]:
def reset_metadata_vars() -> None:
global metadata_vars
metadata_vars = None


def get_invocation_id() -> str:
return EVENT_MANAGER.invocation_id


def set_invocation_id() -> None:
# This is primarily for setting the invocation_id for separate
# commands in the dbt servers. It shouldn't be necessary for the CLI.
EVENT_MANAGER.invocation_id = str(uuid.uuid4())


def ctx_set_event_manager(event_manager: IEventManager) -> None:
global EVENT_MANAGER
EVENT_MANAGER = event_manager
Loading
Loading