diff --git a/cosmos/providers/dbt/core/operators/docker.py b/cosmos/providers/dbt/core/operators/docker.py index c17fa8d75a..2d9c6cbbdd 100644 --- a/cosmos/providers/dbt/core/operators/docker.py +++ b/cosmos/providers/dbt/core/operators/docker.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from typing import Sequence +from typing import Callable, Optional, Sequence import yaml from airflow.utils.context import Context @@ -137,9 +137,13 @@ class DbtTestDockerOperator(DbtDockerBaseOperator): ui_color = "#8194E0" - def __init__(self, **kwargs) -> None: + def __init__( + self, on_warning_callback: Optional[Callable] = None, **kwargs + ) -> None: super().__init__(**kwargs) self.base_cmd = "test" + # as of now, on_warning_callback in docker executor does nothing + self.on_warning_callback = on_warning_callback def execute(self, context: Context): return self.build_and_run_cmd(context=context) diff --git a/cosmos/providers/dbt/core/operators/kubernetes.py b/cosmos/providers/dbt/core/operators/kubernetes.py index 75c5e31928..c26f72c241 100644 --- a/cosmos/providers/dbt/core/operators/kubernetes.py +++ b/cosmos/providers/dbt/core/operators/kubernetes.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from typing import Sequence +from typing import Callable, Optional, Sequence import yaml from airflow.utils.context import Context @@ -147,9 +147,13 @@ class DbtTestKubernetesOperator(DbtKubernetesBaseOperator): ui_color = "#8194E0" - def __init__(self, **kwargs) -> None: + def __init__( + self, on_warning_callback: Optional[Callable] = None, **kwargs + ) -> None: super().__init__(**kwargs) self.base_cmd = "test" + # as of now, on_warning_callback in kubernetes executor does nothing + self.on_warning_callback = on_warning_callback def execute(self, context: Context): return self.build_and_run_cmd(context=context) diff --git a/cosmos/providers/dbt/core/operators/local.py b/cosmos/providers/dbt/core/operators/local.py index 3d276f7569..dfee2e6c66 100644 --- a/cosmos/providers/dbt/core/operators/local.py +++ b/cosmos/providers/dbt/core/operators/local.py @@ -5,18 +5,29 @@ import shutil import signal import tempfile -from typing import Sequence +from collections import namedtuple +from typing import Callable, Optional, Sequence import yaml from airflow.compat.functools import cached_property from airflow.exceptions import AirflowException, AirflowSkipException -from airflow.hooks.subprocess import SubprocessHook, SubprocessResult from airflow.utils.context import Context from cosmos.providers.dbt.core.operators.base import DbtBaseOperator +from cosmos.providers.dbt.core.utils.adapted_subprocesshook import ( + FullOutputSubprocessHook, +) +from cosmos.providers.dbt.core.utils.warn_parsing import ( + extract_log_issues, + parse_output, +) logger = logging.getLogger(__name__) +FullOutputSubprocessResult = namedtuple( + "FullOutputSubprocessResult", ["exit_code", "output", "full_output"] +) + class DbtLocalBaseOperator(DbtBaseOperator): """ @@ -38,9 +49,9 @@ def __init__( @cached_property def subprocess_hook(self): """Returns hook for running the bash command.""" - return SubprocessHook() + return FullOutputSubprocessHook() - def exception_handling(self, result: SubprocessResult): + def exception_handling(self, result: FullOutputSubprocessResult): if self.skip_exit_code is not None and result.exit_code == self.skip_exit_code: raise AirflowSkipException( f"dbt command returned exit code {self.skip_exit_code}. Skipping." @@ -54,7 +65,7 @@ def run_command( self, cmd: list[str], env: dict[str, str], - ) -> SubprocessResult: + ) -> FullOutputSubprocessResult: """ Copies the dbt project to a temporary directory and runs the command. """ @@ -88,7 +99,7 @@ def run_command( def build_and_run_cmd( self, context: Context, cmd_flags: list[str] | None = None - ) -> SubprocessResult: + ) -> FullOutputSubprocessResult: dbt_cmd, env = self.build_cmd(context=context, cmd_flags=cmd_flags) return self.run_command(cmd=dbt_cmd, env=env) @@ -189,16 +200,63 @@ def execute(self, context: Context): class DbtTestLocalOperator(DbtLocalBaseOperator): """ Executes a dbt core test command. + :param on_warning_callback: A callback function called on warnings with additional Context variables "test_names" + and "test_results" of type `List`. Each index in "test_names" corresponds to the same index in "test_results". """ ui_color = "#8194E0" - def __init__(self, **kwargs) -> None: + def __init__( + self, + on_warning_callback: Optional[Callable] = None, + **kwargs, + ) -> None: super().__init__(**kwargs) self.base_cmd = "test" + self.on_warning_callback = on_warning_callback + + def _should_run_tests( + self, + result: FullOutputSubprocessResult, + no_tests_message: str = "Nothing to do", + ) -> bool: + """ + Check if any tests are defined to run in the DAG. If tests are defined + and on_warning_callback is set, then function returns True. + + :param result: The output from the build and run command. + """ + + return self.on_warning_callback and no_tests_message not in result.output + + def _handle_warnings( + self, result: FullOutputSubprocessResult, context: Context + ) -> None: + """ + Handles warnings by extracting log issues, creating additional context, and calling the + on_warning_callback with the updated context. + + :param result: The result object from the build and run command. + :param context: The original airflow context in which the build and run command was executed. + """ + test_names, test_results = extract_log_issues(result.full_output) + + warning_context = dict(context) + warning_context["test_names"] = test_names + warning_context["test_results"] = test_results + + self.on_warning_callback(warning_context) def execute(self, context: Context): result = self.build_and_run_cmd(context=context) + + if not self._should_run_tests(result): + return result.output + + warnings = parse_output(result, "WARN") + if warnings > 0: + self._handle_warnings(result, context) + return result.output diff --git a/cosmos/providers/dbt/core/utils/adapted_subprocesshook.py b/cosmos/providers/dbt/core/utils/adapted_subprocesshook.py new file mode 100644 index 0000000000..db42e76a4e --- /dev/null +++ b/cosmos/providers/dbt/core/utils/adapted_subprocesshook.py @@ -0,0 +1,108 @@ +# This hook has been refined from the Airflow SubprocessHook, offering an added functionality to the original. +# It presents an alternative option to return the complete command output, as opposed to solely the last line from +# stdout or stderr. This option proves to be highly beneficial for any text analysis that depends on the stdout or +# stderr output of a dbt command. +from __future__ import annotations + +import contextlib +import os +import signal +from collections import namedtuple +from subprocess import PIPE, STDOUT, Popen +from tempfile import TemporaryDirectory, gettempdir + +from airflow.hooks.base import BaseHook + +FullOutputSubprocessResult = namedtuple( + "FullOutputSubprocessResult", ["exit_code", "output", "full_output"] +) + + +class FullOutputSubprocessHook(BaseHook): + """Hook for running processes with the ``subprocess`` module.""" + + def __init__(self) -> None: + self.sub_process: Popen[bytes] | None = None + super().__init__() + + def run_command( + self, + command: list[str], + env: dict[str, str] | None = None, + output_encoding: str = "utf-8", + cwd: str | None = None, + ) -> FullOutputSubprocessResult: + """ + Execute the command. + + If ``cwd`` is None, execute the command in a temporary directory which will be cleaned afterwards. + If ``env`` is not supplied, ``os.environ`` is passed + + :param command: the command to run + :param env: Optional dict containing environment variables to be made available to the shell + environment in which ``command`` will be executed. If omitted, ``os.environ`` will be used. + Note, that in case you have Sentry configured, original variables from the environment + will also be passed to the subprocess with ``SUBPROCESS_`` prefix. See + :doc:`/administration-and-deployment/logging-monitoring/errors` for details. + :param output_encoding: encoding to use for decoding stdout + :param cwd: Working directory to run the command in. + If None (default), the command is run in a temporary directory. + :return: :class:`namedtuple` containing: + ``exit_code`` + ``output``: the last line from stderr or stdout + ``full_output``: all lines from stderr or stdout. + """ + self.log.info("Tmp dir root location: \n %s", gettempdir()) + log_lines = [] + with contextlib.ExitStack() as stack: + if cwd is None: + cwd = stack.enter_context(TemporaryDirectory(prefix="airflowtmp")) + + def pre_exec(): + # Restore default signal disposition and invoke setsid + for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"): + if hasattr(signal, sig): + signal.signal(getattr(signal, sig), signal.SIG_DFL) + os.setsid() + + self.log.info("Running command: %s", command) + + self.sub_process = Popen( + command, + stdout=PIPE, + stderr=STDOUT, + cwd=cwd, + env=env if env or env == {} else os.environ, + preexec_fn=pre_exec, + ) + + self.log.info("Output:") + line = "" + + if self.sub_process is None: + raise RuntimeError("The subprocess should be created here and is None!") + if self.sub_process.stdout is not None: + for raw_line in iter(self.sub_process.stdout.readline, b""): + line = raw_line.decode( + output_encoding, errors="backslashreplace" + ).rstrip() + # storing the warn & error lines to be used later + log_lines.append(line) + self.log.info("%s", line) + + self.sub_process.wait() + + self.log.info( + "Command exited with return code %s", self.sub_process.returncode + ) + return_code: int = self.sub_process.returncode + + return FullOutputSubprocessResult( + exit_code=return_code, output=line, full_output=log_lines + ) + + def send_sigterm(self): + """Sends SIGTERM signal to ``self.sub_process`` if one exists.""" + self.log.info("Sending SIGTERM signal to process group") + if self.sub_process and hasattr(self.sub_process, "pid"): + os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM) diff --git a/cosmos/providers/dbt/core/utils/tests/test_warn_parsing.py b/cosmos/providers/dbt/core/utils/tests/test_warn_parsing.py new file mode 100644 index 0000000000..f6e594330b --- /dev/null +++ b/cosmos/providers/dbt/core/utils/tests/test_warn_parsing.py @@ -0,0 +1,39 @@ +from airflow.hooks.subprocess import SubprocessResult + +from cosmos.providers.dbt.core.utils.warn_parsing import ( + extract_log_issues, + parse_output, +) + + +def test_parse_output() -> None: + for warnings in range(0, 3): + output_str = f"Done. PASS=15 WARN={warnings} ERROR=0 SKIP=0 TOTAL=16" + keyword = "WARN" + result = SubprocessResult(exit_code=0, output=output_str) + num_warns = parse_output(result, keyword) + assert num_warns == warnings + + +def test_extract_log_issues() -> None: + log_list = [ + "20:30:01 \x1b[33mRunning with dbt=1.3.0\x1b[0m", + "20:30:03 \x1b[33mFinished running 1 test in 10.31s.\x1b[0m", + "20:30:02 \x1b[33mWarning in test my_test (models/my_model.sql)\x1b[0m", + "20:30:02 \x1b[33mSome warning message\x1b[0m", + "20:30:03 \x1b[33mWarning in test my_second_test (models/my_model.sql)\x1b[0m", + "20:30:03 \x1b[33mA very different warning message\x1b[0m", + ] + test_names, test_results = extract_log_issues(log_list) + assert "my_test" in test_names + assert "my_second_test" in test_names + assert "Some warning message" in test_results + assert "A very different warning message" in test_results + + log_list_no_warning = [ + "20:30:01 \x1b[33mRunning with dbt=1.3.0\x1b[0m", + "20:30:03 \x1b[33mFinished running 1 test in 10.31s.\x1b[0m", + ] + test_names_no_warns, test_results_no_warns = extract_log_issues(log_list_no_warning) + assert test_names_no_warns == [] + assert test_results_no_warns == [] diff --git a/cosmos/providers/dbt/core/utils/warn_parsing.py b/cosmos/providers/dbt/core/utils/warn_parsing.py new file mode 100644 index 0000000000..1d9b73a9c3 --- /dev/null +++ b/cosmos/providers/dbt/core/utils/warn_parsing.py @@ -0,0 +1,71 @@ +import logging +import re +from typing import List, Tuple + +from airflow.hooks.subprocess import SubprocessResult + + +def parse_output(result: SubprocessResult, keyword: str) -> int: + """ + Parses the DBT test output message and returns the number of errors or warnings. + + :param result: String containing the output to be parsed. + :param keyword: String representing the keyword to search for in the output (WARN, ERROR). + :return: An integer value associated with the keyword, or 0 if parsing fails. + + Usage: + ----- + output_str = "Done. PASS=15 WARN=1 ERROR=0 SKIP=0 TOTAL=16" + keyword = "WARN" + num_warns = parse_output(output_str, keyword) + print(num_warns) + # Output: 1 + """ + output = result.output + try: + num = int(output.split(f"{keyword}=")[1].split()[0]) + except ValueError: + logging.error( + f"Could not parse number of {keyword}s. Check your dbt/airflow version or if --quiet is not being used" + ) + return num + + +def extract_log_issues(log_list: List[str]) -> Tuple[List[str], List[str]]: + """ + Extracts warning messages from the log list and returns them as a formatted string. + + This function searches for warning messages in DBT test. It reverses the log list for performance + improvement. It extracts and formats the relevant information and appends it to a list of warnings. + + :param log_list: List of strings, where each string is a log line from DBT test. + :return: two lists of strings, the first one containing the test names and the second one + containing the test results. + """ + + def clean_line(line: str) -> str: + return line.replace("\x1b[33m", "").replace("\x1b[0m", "").strip() + + test_names = [] + test_results = [] + pattern1 = re.compile(r"\d{2}:\d{2}:\d{2}\s+Warning in test ([\w_]+).*") + pattern2 = re.compile(r"\d{2}:\d{2}:\d{2}\s+(.*)") + + for line_index, line in enumerate(reversed(log_list)): + cleaned_line = clean_line(line) + + if "Finished running" in cleaned_line: + # No need to keep checking the log lines once all warnings are found + break + + if "Warning in test" in cleaned_line: + test_name = pattern1.sub(r"\1", cleaned_line) + # test_result is on the next line by default + test_result = pattern2.sub( + r"\1", clean_line(log_list[-(line_index + 1) + 1]) + ) + + test_names.append(test_name) + test_results.append(test_result) + + return test_names, test_results diff --git a/cosmos/providers/dbt/dag.py b/cosmos/providers/dbt/dag.py index 10fa90962a..e05626e957 100644 --- a/cosmos/providers/dbt/dag.py +++ b/cosmos/providers/dbt/dag.py @@ -6,7 +6,7 @@ except ImportError: from typing_extensions import Literal -from typing import Any, Dict, List +from typing import Any, Callable, Dict, List, Optional from cosmos.core.airflow import CosmosDag @@ -34,6 +34,8 @@ class DbtDag(CosmosDag): :param execution_mode: The execution mode in which the dbt project should be run. Options are "local", "docker", and "kubernetes". Defaults to "local" + :param on_warning_callback: A callback function called on warnings with additional Context variables "test_names" + and "test_results" of type `List`. Each index in "test_names" corresponds to the same index in "test_results". """ def __init__( @@ -50,6 +52,7 @@ def __init__( select: Dict[str, List[str]] = {}, exclude: Dict[str, List[str]] = {}, execution_mode: Literal["local", "docker", "kubernetes"] = "local", + on_warning_callback: Optional[Callable] = None, *args: Any, **kwargs: Any, ) -> None: @@ -73,6 +76,7 @@ def __init__( select=select, exclude=exclude, execution_mode=execution_mode, + on_warning_callback=on_warning_callback, ) # call the airflow DAG constructor diff --git a/cosmos/providers/dbt/render.py b/cosmos/providers/dbt/render.py index ae5237c4b1..cfde8afbda 100644 --- a/cosmos/providers/dbt/render.py +++ b/cosmos/providers/dbt/render.py @@ -9,7 +9,7 @@ except ImportError: from typing_extensions import Literal -from typing import Any, Dict, List +from typing import Any, Callable, Dict, List, Optional from airflow.exceptions import AirflowException @@ -41,6 +41,7 @@ def render_project( select: Dict[str, List[str]] = {}, exclude: Dict[str, List[str]] = {}, execution_mode: Literal["local", "docker", "kubernetes"] = "local", + on_warning_callback: Optional[Callable] = None, ) -> Group: """ Turn a dbt project into a Group @@ -59,6 +60,8 @@ def render_project( :param execution_mode: The execution mode in which the dbt project should be run. Options are "local", "docker", and "kubernetes". Defaults to "local" + :param on_warning_callback: A callback function called on warnings with additional Context variables "test_names" + and "test_results" of type `List`. Each index in "test_names" corresponds to the same index in "test_results". """ # first, get the dbt project project = DbtProject( @@ -127,7 +130,8 @@ def render_project( run_args: Dict[str, Any] = {**task_args, **operator_args, "models": model_name} test_args: Dict[str, Any] = {**task_args, **operator_args, "models": model_name} - + # DbtTestOperator specific arg + test_args["on_warning_callback"] = on_warning_callback if emit_datasets: outlets = [get_dbt_dataset(conn_id, dbt_project_name, model_name)] diff --git a/cosmos/providers/dbt/task_group.py b/cosmos/providers/dbt/task_group.py index a2690c1b6d..91f4a8bccd 100644 --- a/cosmos/providers/dbt/task_group.py +++ b/cosmos/providers/dbt/task_group.py @@ -6,7 +6,7 @@ except ImportError: from typing_extensions import Literal -from typing import Any, Dict, List +from typing import Any, Callable, Dict, List, Optional from cosmos.core.airflow import CosmosTaskGroup @@ -35,6 +35,8 @@ class DbtTaskGroup(CosmosTaskGroup): :param execution_mode: The execution mode in which the dbt project should be run. Options are "local", "docker", and "kubernetes". Defaults to "local" + :param on_warning_callback: A callback function called on warnings with additional Context variables "test_names" + and "test_results" of type `List`. Each index in "test_names" corresponds to the same index in "test_results". """ def __init__( @@ -52,6 +54,7 @@ def __init__( select: Dict[str, List[str]] = {}, exclude: Dict[str, List[str]] = {}, execution_mode: Literal["local", "docker", "kubernetes"] = "local", + on_warning_callback: Optional[Callable] = None, *args: Any, **kwargs: Any, ) -> None: @@ -76,6 +79,7 @@ def __init__( select=select, exclude=exclude, execution_mode=execution_mode, + on_warning_callback=on_warning_callback, ) # call the airflow constructor diff --git a/docs/_static/callback_slack.png b/docs/_static/callback_slack.png new file mode 100644 index 0000000000..1ef9452f6d Binary files /dev/null and b/docs/_static/callback_slack.png differ diff --git a/docs/dbt/configuration.rst b/docs/dbt/configuration.rst index 1fa8521528..0028f5ecb3 100644 --- a/docs/dbt/configuration.rst +++ b/docs/dbt/configuration.rst @@ -23,6 +23,64 @@ Example: test_behavior='after_all', ) +Warn Notification +---------------------- +.. note:: + + As of now, this feature is only available for the default execution mode ``local`` + +Cosmos enables you to receive warning notifications from tests and process them using a callback function. +The ``on_warning_callback`` parameter adds two extra context variables to the callback function: ``test_names`` and ``test_results``. +``test_names`` contains the names of the tests that generated a warning, while ``test_results`` holds the corresponding test results +at the same index. Both are List of strings. +Example: + +.. code-block:: python + + from cosmos.providers.dbt import DbtDag + from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook + + def warning_callback_func(context): + tests = context.get('test_names') + results = context.get('test_results') + + warning_msgs = "" + for test, result in zip(tests, results): + warning_msg = f""" + *Test*: {test} + *Result*: {result} + """ + warning_msgs += warning_msg + + if warning_msgs: + slack_msg = f""" + :large_yellow_circle: Airflow-DBT task with WARN. + *Task*: {context.get('task_instance').task_id} + *Dag*: {context.get('task_instance').dag_id} + *Execution Time*: {context.get('execution_date')} + *Log Url*: {context.get('task_instance').log_url} + {warning_msgs} + """ + + slack_hook = SlackWebhookHook(slack_webhook_conn_id='slack_conn_id') + slack_hook.send(text=slack_msg) + + mrr_playbook = DbtDag( + # ... + on_warning_callback=warning_callback_func, + ) + +When at least one WARN message is present, the function passed to ``on_warning_callback`` will be triggered +and the following message will be sent to Slack in the example above: + +.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/callback-slack.png + :width: 600 + +.. note:: + + If warnings that are not associated with tests occur (e.g. freshness warnings), they will still trigger the + ``on_warning_callback`` method above. However, these warnings will not be included in the ``test_names`` and + ``test_results`` context variables, which are specific to test-related warnings. Selecting and Excluding ----------------------